Skip to content

Commit

Permalink
cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
allisonkarlitskaya committed Jan 30, 2024
1 parent e22b701 commit 638b91a
Showing 1 changed file with 127 additions and 140 deletions.
267 changes: 127 additions & 140 deletions src/cockpit/channels/pcp.py
Original file line number Diff line number Diff line change
@@ -1,153 +1,145 @@
import sys
import os
# This file is part of Cockpit.
#
# Copyright (C) 2023 Red Hat, Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from __future__ import annotations

import asyncio
from collections import defaultdict
import glob
import json
import logging
from typing import Any, Dict, NamedTuple, Optional, List, Union, Set
import platform
import time
from collections import defaultdict
from typing import TYPE_CHECKING, Any, Iterable, Mapping, Sequence

from ..channel import AsyncChannel, ChannelError
from ..jsonutil import JsonList, JsonObject
from cockpit._vendor.systemd_ctypes import Handle
from ..jsonutil import JsonObject, JsonValue, get_int, get_objv, get_str

if TYPE_CHECKING:
import cpmapi as c_api
from pcp import pmapi
else:
pmapi = None
c_api = None

logger = logging.getLogger(__name__)

from pcp import pmapi
import cpmapi as c_api

class PcpMetricInfo(NamedTuple):
name: str
derive: Optional[str]
def try_import_pcp() -> None:
global c_api, pmapi
if c_api is None or pmapi is None:
try:
import cpmapi as c_api
from pcp import pmapi
except ImportError as exc:
raise ChannelError('not-supported', message='python3-pcp not installed') from exc


class ArchiveInfo():
def __init__(self, context, start, path):
class PcpMetricInfo(dict[str, JsonValue]):
def __init__(self, value: JsonObject) -> None:
self.name = get_str(value, 'name')
self.derive = get_str(value, 'derive', None)
super().__init__(name=self.name, derive=self.derive)


class ArchiveInfo:
def __init__(self, context: Any, start: float, path: str) -> None:
self.context = context
self.start = start
self.path = path

# bleh, not like this
def __lt__(self, other):
return self.start < other.start
def sort_key(self) -> float:
return self.start

def __gt__(self, other):
return self.start > other.start

Sample = Mapping[str, float | list[float] | None]


class PcpMetricsChannel(AsyncChannel):
payload = 'metrics1'
# restrictions = [('source', 'pcp-archive')]
restrictions = [('source', 'pcp-archive')]

pcp_dir: str
archive_batch = 60

archives: List[ArchiveInfo]
metrics: List[PcpMetricInfo]
interval: int = 1000
archives: list[ArchiveInfo]
metrics: Sequence[PcpMetricInfo]
interval: int
need_meta: bool = True
start_timestamp: float = 0
start_timestamp: int
last_timestamp: float = 0
next_timestamp: float = 0
last_samples: Dict = defaultdict(lambda: None)
last_samples: Sample | None = None

async def run(self, options: JsonObject) -> None:
# TODO figure out
# if not all(module in sys.modules for module in ['pmapi', 'c_api']):
# self.try_import_pcp()
logger.debug('metrics pcp-archive open: %r, channel: %r', options, self.channel)

self.metrics = []
self.start_timestamp = get_int(options, 'timestamp', int(time.time() * 1000))
self.interval = get_int(options, 'interval', 1000)
self.metrics = get_objv(options, 'metrics', PcpMetricInfo)
if not self.metrics:
raise ChannelError('protocol-error', message='metrics list must not be empty')

self.parse_options(options)
try_import_pcp() # after parsing arguments

try:
self.prepare_archives()
except FileNotFoundError:
raise ChannelError('failed to open archives')
self.archives = sorted(self.prepare_archives(), key=ArchiveInfo.sort_key)
except FileNotFoundError as exc:
raise ChannelError('failed to open archives') from exc

self.ready()

self.sample_archives()

return

def try_import_pcp(self) -> None:
pass
# try:
# global pmapi
# pmapi = getattr(import_module('pcp'), 'pmapi')
# global c_api
# c_api = getattr(import_module('cpmapi'), 'c_api')
# except ImportError:
# raise ChannelError('not-supported', message='Pcp is not installed')

def parse_options(self, options: JsonObject):
logger.debug('metrics pcp-archive open: %s, channel: %s', options, self.channel)

timestamp = options.get('timestamp')
if not isinstance(timestamp, int):
logger.error('no "timestamp" was specified')
raise ChannelError('protocol-error', message='no "timestamp" was specified')

self.start_timestamp = timestamp

interval = options.get('interval')
if isinstance(interval, int):
self.interval = interval

metrics = options.get('metrics')
if not isinstance(metrics, list) or len(metrics) == 0:
logger.error('invalid "metrics" value: %s', metrics)
raise ChannelError('protocol-error', message='invalid "metrics" option was specified (not an array)')
while True:
self.sample_archives()

for metric in metrics:
name = metric.get('name')
derive = metric.get('derive')

self.metrics.append(PcpMetricInfo(name=name, derive=derive))
try:
await asyncio.wait_for(self.read(), self.interval / 1000)
return
except asyncio.TimeoutError:
# Continue the while loop, we use wait_for as an interval timer.
continue

@staticmethod
def float_to_timeval(timestamp: float) -> pmapi.timeval:
sec = int(timestamp / 1000)
usec = int((timestamp % 1000) * 1000)
return pmapi.timeval(sec, usec)

def add_archive(self, archive_path: str) -> ArchiveInfo:
context = pmapi.pmContext(c_api.PM_CONTEXT_ARCHIVE, archive_path)
log_label = context.pmGetArchiveLabel()
archive_start = float(log_label.start) * 1000

return ArchiveInfo(context=context, start=archive_start, path=archive_path)

def prepare_archives(self) -> None:
hostname = os.uname()[1]
@staticmethod
def prepare_archives() -> Iterable[ArchiveInfo]:
hostname = platform.node()
archive_dir = f'{pmapi.pmContext.pmGetConfig("PCP_LOG_DIR")}/pmlogger/{hostname}'
indexes = glob.glob(glob.escape(archive_dir) + '/*.index')

with Handle.open(archive_dir, os.O_RDONLY | os.O_DIRECTORY) as archives_fd:
archives: List[ArchiveInfo] = []

for file_name in os.listdir(archives_fd):
if file_name.endswith('.index'):
logger.debug(f'opening archive: {file_name}')
try:
archives.append(self.add_archive(f'{archive_dir}/{file_name}'))
except pmapi.pmErr as ex:
if ex.errno() != c_api.PM_ERR_LOGFILE:
raise ex
else:
continue

archives.sort()
self.archives = archives
for archive_path in indexes:
logger.debug('opening archive: %r', archive_path)
try:
context = pmapi.pmContext(c_api.PM_CONTEXT_ARCHIVE, archive_path)
log_label = context.pmGetArchiveLabel()
archive_start = float(log_label.start) * 1000
yield ArchiveInfo(context, archive_start, archive_path)
except pmapi.pmErr as exc:
if exc.errno() != c_api.PM_ERR_LOGFILE:
raise

def send_meta(self) -> None:
metrics: JsonList = []
for metricinfo in self.metrics:
metrics.append({
'name': metricinfo.name,
'derive': metricinfo.derive,
})

self.send_json(source='pcp-archive', interval=self.interval, timestamp=self.start_timestamp, metrics=metrics)
self.send_json(
source='pcp-archive', interval=self.interval, timestamp=self.start_timestamp, metrics=self.metrics
)
self.need_meta = False

def sample_archives(self) -> None:
Expand All @@ -160,24 +152,22 @@ def sample_archives(self) -> None:
continue

if timestamp < archive.start:
logging.debug("ligma balls")
logger.debug("ligma balls")
timestamp = int(archive.start)

context = archive.context
logging.debug(f'timestamp:\t\t{timestamp}')
logging.debug(f'archive_start:\t{archive.start}')
logging.debug(f'archive_end:\t{context.pmGetArchiveEnd()}')
logger.debug('timestamp: %r', timestamp)
logger.debug('archive_start: %r', archive.start)
logger.debug('archive_end: %r', context.pmGetArchiveEnd())
context.pmSetMode(c_api.PM_MODE_INTERP | c_api.PM_XTB_SET(c_api.PM_TIME_MSEC),
self.float_to_timeval(timestamp), self.interval)
self.sample(context)

def sample(self, current_context: pmapi.pmContext) -> None:
metrics = list(self.metrics)

pmids = current_context.pmLookupName([metric.name for metric in metrics])
pmids = current_context.pmLookupName([metric.name for metric in self.metrics])
descs = current_context.pmLookupDescs(pmids)

logging.debug('BEGIN SAMPLING')
logger.debug('BEGIN SAMPLING')
while True:
fetched = []
try:
Expand All @@ -187,23 +177,23 @@ def sample(self, current_context: pmapi.pmContext) -> None:

self.send_updates(fetched)
fetched.clear()
except pmapi.pmErr as ex:
logging.debug(f'Fetching error: {ex}\t\tfetched: {fetched}')
if ex.errno() != c_api.PM_ERR_EOL:
raise ex
except pmapi.pmErr as exc:
logger.debug('Fetching error: %r, fetched %r', exc, fetched)
if exc.errno() != c_api.PM_ERR_EOL:
raise
if len(fetched) > 0:
self.send_updates(fetched)

break

def parse_fetched_results(self, context: pmapi.pmContext, results: Any, descs: Any) -> Dict[str, Union[float, List[float]]]:
def parse_fetched_results(self, context: pmapi.pmContext, results: Any, descs: Any) -> Sample:
metrics = list(self.metrics)
samples = {}
samples: dict[str, float | list[float]] = {}

samples['timestamp'] = float(results.contents.timestamp)
for i in range(results.contents.numpmid):
values: Union[dict, float] = defaultdict()
instances: Optional[List[str]] = None
values: dict[str, float] | float = defaultdict()
instances: list[str] | None = None
value_count = results.contents.get_numval(i)

if value_count > 1:
Expand All @@ -228,47 +218,44 @@ def parse_fetched_results(self, context: pmapi.pmContext, results: Any, descs: A

return samples

def calculate_sample_rate(self, value: float, old_value: Optional[float]) -> Union[float, bool]:
def calculate_sample_rate(self, value: float, old_value: float | None) -> float | bool:
if old_value is not None and self.last_timestamp:
return (value - old_value) / (self.next_timestamp - self.last_timestamp)
else:
return False

def send_updates(self, samples: List[Dict]):
def send_updates(self, samples: Sequence[Sample]) -> None:
# data: List[List[Union[float, List[Optional[Union[float, bool]]]]]] = []
data: List[List[Union[float, List[float]]]] = []
last_samples = self.last_samples
data: list[list[float | list[float]]] = []
last_samples = self.last_samples or {}

for sample in samples:
assert isinstance(sample['timestamp'], float)
self.next_timestamp = sample['timestamp']
sampled_values: List[Union[float, List[float]]] = []
sampled_values: list[float | list[float]] = []
for metricinfo in self.metrics:
value = sample[metricinfo.name]
old_value = last_samples.get(metricinfo.name, None)

if isinstance(value, dict):
old_value = last_samples[metricinfo.name]
assert isinstance(value, dict)
if old_value == None:
old_value = {}
logger.debug('old %r new %r', old_value, value)

# If we have less or more keys the data changed, send a meta message.
if value.keys() != old_value.keys():
if isinstance(value, Mapping):
# If the old value wasn't an equivalent a mapping, we need a meta
if not isinstance(old_value, Mapping) or value.keys() != old_value.keys():
self.need_meta = True
old_value = {}

if metricinfo.derive == 'rate':
instances: List[Optional[Union[float, bool]]] = []

for key, val in value.items():
instances.append(self.calculate_sample_rate(val, old_value.get(key)))

instances = tuple(self.calculate_sample_rate(value[key], old_value.get(key)) for key in value)
sampled_values.append(instances)
else:
sampled_values.append(list(value.values()))
sampled_values.append(tuple(value.values()))
else:
old_value = last_samples.get(metricinfo.name)
assert isinstance(value, float)
# hack because I need some default value to initialize old_values in the first round of sampling
if not isinstance(old_value, float | None):

# If the old value was a mapping, we need a meta
if isinstance(old_value, Mapping):
self.need_meta = True
old_value = None

if metricinfo.derive == 'rate':
Expand Down

0 comments on commit 638b91a

Please sign in to comment.