From 660dd33369b27dae3c1b5ddfc08fd5527f1d994f Mon Sep 17 00:00:00 2001 From: Arseniy Kuznetsov Date: Fri, 30 Dec 2022 19:11:07 +0100 Subject: [PATCH] simple queue, parallel router fetch as_completed , config autoupdate fixes --- mktxp/cli/config/config.py | 35 +++++++++++++------- mktxp/collector/queue_collector.py | 52 ++++++++++++++---------------- mktxp/datasource/queue_ds.py | 42 +++++++++++++----------- mktxp/flow/collector_handler.py | 12 +++---- mktxp/flow/collector_registry.py | 2 ++ mktxp/flow/router_entry.py | 1 + 6 files changed, 80 insertions(+), 64 deletions(-) diff --git a/mktxp/cli/config/config.py b/mktxp/cli/config/config.py index ad943e6..1b4e677 100755 --- a/mktxp/cli/config/config.py +++ b/mktxp/cli/config/config.py @@ -247,13 +247,14 @@ class MKTXPConfigHandler: def _config_entry_reader(self, entry_name): config_entry_reader = {} - write_needed = False + new_keys = [] + for key in MKTXPConfigKeys.BOOLEAN_KEYS_NO.union(MKTXPConfigKeys.BOOLEAN_KEYS_YES): if self.config[entry_name].get(key): config_entry_reader[key] = self.config[entry_name].as_bool(key) else: config_entry_reader[key] = True if key in MKTXPConfigKeys.BOOLEAN_KEYS_YES else False - write_needed = True # read from disk next time + new_keys.append(key) # read from disk next time for key in MKTXPConfigKeys.STR_KEYS: config_entry_reader[key] = self.config[entry_name][key] @@ -267,25 +268,31 @@ class MKTXPConfigHandler: else: config_entry_reader[MKTXPConfigKeys.PORT_KEY] = self._default_value_for_key( MKTXPConfigKeys.SSL_KEY, config_entry_reader[MKTXPConfigKeys.SSL_KEY]) - write_needed = True # read from disk next time - - if write_needed: + new_keys.append(MKTXPConfigKeys.PORT_KEY) # read from disk next time + + if new_keys: self.config[entry_name] = config_entry_reader - self.config.write() + try: + self.config.write() + if self._config[MKTXPConfigKeys.MKTXP_CONFIG_ENTRY_NAME].as_bool(MKTXPConfigKeys.MKTXP_VERBOSE_MODE): + print(f'Updated router entry {entry_name} with new feature keys {new_keys}') + except Exception as exc: + print(f'Error updating router entry {entry_name} with new feature keys {new_keys}: {exc}') + print('Please update mktxp.conf to its latest version manually') return config_entry_reader def _system_entry_reader(self): system_entry_reader = {} entry_name = MKTXPConfigKeys.MKTXP_CONFIG_ENTRY_NAME - write_needed = False + new_keys = [] for key in MKTXPConfigKeys.MKTXP_INT_KEYS: if self._config[entry_name].get(key): system_entry_reader[key] = self._config[entry_name].as_int(key) else: system_entry_reader[key] = self._default_value_for_key(key) - write_needed = True # read from disk next time + new_keys.append(key) # read from disk next time for key in MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_NO.union(MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_YES): if self._config[entry_name].get(key): @@ -293,11 +300,17 @@ class MKTXPConfigHandler: key) else: system_entry_reader[key] = True if key in MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_YES else False - write_needed = True # read from disk next time + new_keys.append(key) # read from disk next time - if write_needed: + if new_keys: self._config[entry_name] = system_entry_reader - self._config.write() + try: + self._config.write() + if self._config[entry_name].as_bool(MKTXPConfigKeys.MKTXP_VERBOSE_MODE): + print(f'Updated system entry {entry_name} with new system keys {new_keys}') + except Exception as exc: + print(f'Error updating system entry {entry_name} with new system keys {new_keys}: {exc}') + print('Please update _mktxp.conf to its latest version manually') return system_entry_reader diff --git a/mktxp/collector/queue_collector.py b/mktxp/collector/queue_collector.py index 46344af..f680880 100644 --- a/mktxp/collector/queue_collector.py +++ b/mktxp/collector/queue_collector.py @@ -13,75 +13,71 @@ from mktxp.collector.base_collector import BaseCollector -from mktxp.datasource.queue_ds import QueueTreeMetricsDataSource +from mktxp.datasource.queue_ds import QueueMetricsDataSource class QueueTreeCollector(BaseCollector): '''Queue Tree collector''' @staticmethod def collect(router_entry): - if not router_entry.config_entry.installed_packages: + if not router_entry.config_entry.queue: return - qt_labels = ['name', 'parent', 'packet_mark', 'limit_at', 'max_limit', 'priority', 'bytes', 'packets', 'queued_bytes', 'queued_packets','dropped', 'rate', 'packet_rate', 'disabled'] - qt_records = QueueTreeMetricsDataSource.metric_records(router_entry, metric_labels=qt_labels) + qt_labels = ['name', 'parent', 'packet_mark', 'limit_at', 'max_limit', 'priority', 'bytes', 'queued_bytes', 'dropped', 'rate', 'disabled'] + qt_records = QueueMetricsDataSource.metric_records(router_entry, metric_labels=qt_labels, kind = 'tree') if qt_records: qt_rate_metric = BaseCollector.counter_collector('queue_tree_rates', 'Average passing data rate in bytes per second', qt_records, 'rate', ['name']) yield qt_rate_metric - qt_packet_rate_metric = BaseCollector.counter_collector('queue_tree_packet_rates', 'Average passing data rate in packets per second', qt_records, 'packet_rate', ['name']) - yield qt_packet_rate_metric - qt_byte_metric = BaseCollector.counter_collector('queue_tree_bytes', 'Number of processed bytes', qt_records, 'bytes', ['name']) yield qt_byte_metric - qt_packet_metric = BaseCollector.counter_collector('queue_tree_pakets', 'Number of processed packets', qt_records, 'packets', ['name']) - yield qt_packet_metric - qt_queued_metric = BaseCollector.counter_collector('queue_tree_queued_bytes', 'Number of queued bytes', qt_records, 'queued_bytes', ['name']) yield qt_queued_metric - qt_queued_packets_metric = BaseCollector.counter_collector('queue_tree_queued_packets', 'Number of queued packets', qt_records, 'queued_packets', ['name']) - yield qt_queued_packets_metric - - - qt_drop_metric = BaseCollector.counter_collector('queue_tree_dropped', 'Number of dropped packets', qt_records, 'dropped', ['name']) + qt_drop_metric = BaseCollector.counter_collector('queue_tree_dropped', 'Number of dropped bytes', qt_records, 'dropped', ['name']) yield qt_drop_metric -class SimpleCollector(BaseCollector): +class QueueSimpleCollector(BaseCollector): '''Simple Queue collector''' @staticmethod def collect(router_entry): - if not router_entry.config_entry.installed_packages: + if not router_entry.config_entry.queue: return qt_labels = ['name', 'parent', 'packet_mark', 'limit_at', 'max_limit', 'priority', 'bytes', 'packets', 'queued_bytes', 'queued_packets','dropped', 'rate', 'packet_rate', 'disabled'] - qt_records = QueueTreeMetricsDataSource.metric_records(router_entry, metric_labels=qt_labels) + qt_records = QueueMetricsDataSource.metric_records(router_entry, metric_labels=qt_labels, kind = 'simple') if qt_records: - qt_rate_metric = BaseCollector.counter_collector('queue_tree_rates', 'Average passing data rate in bytes per second', qt_records, 'rate', ['name']) + qt_rate_metric = BaseCollector.counter_collector('queue_simple_rates_upload', 'Average passing upload data rate in bytes per second', qt_records, 'rate_up', ['name']) yield qt_rate_metric - qt_packet_rate_metric = BaseCollector.counter_collector('queue_tree_packet_rates', 'Average passing data rate in packets per second', qt_records, 'packet_rate', ['name']) - yield qt_packet_rate_metric + qt_rate_metric = BaseCollector.counter_collector('queue_simple_rates_download', 'Average passing download data rate in bytes per second', qt_records, 'rate_down', ['name']) + yield qt_rate_metric - qt_byte_metric = BaseCollector.counter_collector('queue_tree_bytes', 'Number of processed bytes', qt_records, 'bytes', ['name']) + + qt_byte_metric = BaseCollector.counter_collector('queue_simple_bytes_upload', 'Number of upload processed bytes', qt_records, 'bytes_up', ['name']) yield qt_byte_metric - qt_packet_metric = BaseCollector.counter_collector('queue_tree_pakets', 'Number of processed packets', qt_records, 'packets', ['name']) - yield qt_packet_metric + qt_byte_metric = BaseCollector.counter_collector('queue_simple_bytes_download', 'Number of download processed bytes', qt_records, 'bytes_down', ['name']) + yield qt_byte_metric - qt_queued_metric = BaseCollector.counter_collector('queue_tree_queued_bytes', 'Number of queued bytes', qt_records, 'queued_bytes', ['name']) + + qt_queued_metric = BaseCollector.counter_collector('queue_simple_queued_bytes_upload', 'Number of upload queued bytes', qt_records, 'queued_bytes_up', ['name']) yield qt_queued_metric - qt_queued_packets_metric = BaseCollector.counter_collector('queue_tree_queued_packets', 'Number of queued packets', qt_records, 'queued_packets', ['name']) - yield qt_queued_packets_metric + qt_queued_metric = BaseCollector.counter_collector('queue_simple_queued_bytes_downloadd', 'Number of download queued bytes', qt_records, 'queued_bytes_down', ['name']) + yield qt_queued_metric - qt_drop_metric = BaseCollector.counter_collector('queue_tree_dropped', 'Number of dropped packets', qt_records, 'dropped', ['name']) + qt_drop_metric = BaseCollector.counter_collector('queue_simple_dropped_upload', 'Number of upload dropped bytes', qt_records, 'dropped_up', ['name']) + yield qt_drop_metric + + + qt_drop_metric = BaseCollector.counter_collector('queue_simple_dropped_download', 'Number of download dropped bytes', qt_records, 'dropped_down', ['name']) yield qt_drop_metric diff --git a/mktxp/datasource/queue_ds.py b/mktxp/datasource/queue_ds.py index 8634936..632a80b 100644 --- a/mktxp/datasource/queue_ds.py +++ b/mktxp/datasource/queue_ds.py @@ -15,31 +15,37 @@ from mktxp.datasource.base_ds import BaseDSProcessor -class QueueTreeMetricsDataSource: - ''' Queue Tree Metrics data provider +class QueueMetricsDataSource: + ''' Queue Metrics data provider ''' @staticmethod - def metric_records(router_entry, *, metric_labels = None): + def metric_records(router_entry, *, metric_labels = None, kind = 'tree'): if metric_labels is None: metric_labels = [] try: - queue_tree_records = router_entry.api_connection.router_api().get_resource('/queue/tree/').get() - return BaseDSProcessor.trimmed_records(router_entry, router_records = queue_tree_records, metric_labels = metric_labels) + queue_records = router_entry.api_connection.router_api().get_resource(f'/queue/{kind}/').get() + queue_records = BaseDSProcessor.trimmed_records(router_entry, router_records = queue_records, metric_labels = metric_labels) except Exception as exc: print(f'Error getting system resource info from router{router_entry.router_name}@{router_entry.config_entry.hostname}: {exc}') return None + if kind == 'tree': + return queue_records + + # simple queue records need splitting upload/download values + splitted_queue_records = [] + for queue_record in queue_records: + splitted_queue_record = {} + for key, value in queue_record.items(): + split_values = value.split('/') + if split_values and len(split_values) > 1: + splitted_queue_record[f'{key}_up'] = split_values[0] + splitted_queue_record[f'{key}_down'] = split_values[1] + else: + splitted_queue_record[key] = value + splitted_queue_records.append(splitted_queue_record) + return splitted_queue_records + + + -class SimpleQueueMetricsDataSource: - ''' Simple Queue Metrics data provider - ''' - @staticmethod - def metric_records(router_entry, *, metric_labels = None): - if metric_labels is None: - metric_labels = [] - try: - simple_queue_records = router_entry.api_connection.router_api().get_resource('/queue/simple/').get() - return BaseDSProcessor.trimmed_records(router_entry, router_records = simple_queue_records, metric_labels = metric_labels) - except Exception as exc: - print(f'Error getting system resource info from router{router_entry.router_name}@{router_entry.config_entry.hostname}: {exc}') - return None diff --git a/mktxp/flow/collector_handler.py b/mktxp/flow/collector_handler.py index 834ab41..c97a08f 100644 --- a/mktxp/flow/collector_handler.py +++ b/mktxp/flow/collector_handler.py @@ -11,7 +11,7 @@ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed from timeit import default_timer from datetime import datetime from mktxp.cli.config.config import config_handler @@ -68,13 +68,11 @@ class CollectorHandler: continue # Publish the collection function as a future - future = executor.submit(self.collect_single, router_entry) - futures.append(future) + futures.append(executor.submit(self.collect_single, router_entry)) + + for future in as_completed(futures): + yield from future.result() - # Join all futures and collect their results - for future in futures: - results = future.result() - yield from results def collect(self): now = datetime.now().timestamp() diff --git a/mktxp/flow/collector_registry.py b/mktxp/flow/collector_registry.py index f9a0d5f..e4e9d34 100644 --- a/mktxp/flow/collector_registry.py +++ b/mktxp/flow/collector_registry.py @@ -34,6 +34,7 @@ from mktxp.collector.firewall_collector import FirewallCollector from mktxp.collector.mktxp_collector import MKTXPCollector from mktxp.collector.user_collector import UserCollector from mktxp.collector.queue_collector import QueueTreeCollector +from mktxp.collector.queue_collector import QueueSimpleCollector class CollectorRegistry: @@ -69,6 +70,7 @@ class CollectorRegistry: self.register('UserCollector', UserCollector.collect) self.register('QueueTreeCollector', QueueTreeCollector.collect) + self.register('QueueSimpleCollector', QueueSimpleCollector.collect) self.register('MKTXPCollector', MKTXPCollector.collect) diff --git a/mktxp/flow/router_entry.py b/mktxp/flow/router_entry.py index c572df9..cdd2f29 100644 --- a/mktxp/flow/router_entry.py +++ b/mktxp/flow/router_entry.py @@ -45,6 +45,7 @@ class RouterEntry: 'WLANCollector': 0, 'CapsmanCollector': 0, 'QueueTreeCollector': 0, + 'QueueSimpleCollector': 0, 'UserCollector': 0, 'MKTXPCollector': 0 }