Skip to content

Commit

Permalink
something seems to be working
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasmatus committed Jan 29, 2024
1 parent 17e08e9 commit cf2b958
Showing 1 changed file with 85 additions and 67 deletions.
152 changes: 85 additions & 67 deletions src/cockpit/channels/pcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,31 @@
from typing import Any, Dict, NamedTuple, Optional, List, Union, Set

Check notice

Code scanning / CodeQL

Unused import Note

Import of 'Set' is not used.

from ..channel import AsyncChannel, ChannelError
from ..jsonutil import JsonObject
from ..jsonutil import JsonList, JsonObject
from cockpit._vendor.systemd_ctypes import Handle

# TODO this cant be here
from pcp import pmapi
import cpmapi as c_api

import pdb

logger = logging.getLogger(__name__)

from pcp import pmapi
import cpmapi as c_api

class PcpMetricInfo(NamedTuple):
name: str
derive: Optional[str]


class ArchiveInfo(NamedTuple):
context: pmapi.pmContext
start: pmapi.timeval
path: str

def __int__(self):
return int(self.start.tv_sec * 1000 + self.start.tv_usec / 1000)
class ArchiveInfo():

Check warning

Code scanning / CodeQL

Incomplete ordering Warning

Class ArchiveInfo implements
__lt__
, but does not implement __le__ or __ge__.
Class ArchiveInfo implements
__gt__
, but does not implement __le__ or __ge__.
def __init__(self, context, start, path):
self.context = context
self.start = start
self.path = path

# bleh, not like this
def __lt__(self, other):
first = int(self)
second = int(other)
return first < second
return self.start < other.start

def __gt__(self, other):
first = int(self)
second = int(other)
return first > second
return self.start > other.start


class PcpMetricsChannel(AsyncChannel):
Expand All @@ -54,12 +45,15 @@ class PcpMetricsChannel(AsyncChannel):
metrics: List[PcpMetricInfo]
interval: int = 1000
need_meta: bool = True
load_timestamp: pmapi.timeval
last_samples: Union[List[Union[float, List[float]]], None] = None
start_timestamp: float = 0
last_timestamp: float = 0
next_timestamp: float = 0
last_samples: Dict = defaultdict(lambda: None)

async def run(self, options: JsonObject) -> None:
if not all(module in sys.modules for module in ['pmapi', 'c_api']):
self.try_import_pcp()
# TODO figure out
# if not all(module in sys.modules for module in ['pmapi', 'c_api']):
# self.try_import_pcp()

self.metrics = []

Expand All @@ -74,17 +68,17 @@ async def run(self, options: JsonObject) -> None:

self.sample_archives()

# await asyncio.wait_for(self.read(), self.interval / 1000)
return

def try_import_pcp(self) -> None:
try:
global pmapi
from pcp import pmapi
global c_api
import cpmapi as c_api
except ImportError:
raise ChannelError('not-supported', message='Pcp is not installed')
pass
# try:
# global pmapi
# pmapi = getattr(import_module('pcp'), 'pmapi')
# global c_api
# c_api = getattr(import_module('cpmapi'), 'c_api')
# except ImportError:
# raise ChannelError('not-supported', message='Pcp is not installed')

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.

def parse_options(self, options: JsonObject):
logger.debug('metrics pcp-archive open: %s, channel: %s', options, self.channel)
Expand All @@ -94,7 +88,7 @@ def parse_options(self, options: JsonObject):
logger.error('no "timestamp" was specified')
raise ChannelError('protocol-error', message='no "timestamp" was specified')

self.load_timestamp = pmapi.timeval(sec=int(timestamp / 1000))
self.start_timestamp = timestamp

interval = options.get('interval')
if isinstance(interval, int):
Expand All @@ -112,10 +106,15 @@ def parse_options(self, options: JsonObject):
self.metrics.append(PcpMetricInfo(name=name, derive=derive))

@staticmethod
def add_archive(archive_path: str) -> ArchiveInfo:
def float_to_timeval(timestamp: float) -> pmapi.timeval:
sec = int(timestamp / 1000)
usec = int((timestamp % 1000) * 1000)
return pmapi.timeval(sec, usec)

def add_archive(self, archive_path: str) -> ArchiveInfo:
context = pmapi.pmContext(c_api.PM_CONTEXT_ARCHIVE, archive_path)
log_label = context.pmGetArchiveLabel()
archive_start = log_label.start
archive_start = float(log_label.start) * 1000

return ArchiveInfo(context=context, start=archive_start, path=archive_path)

Expand All @@ -128,6 +127,7 @@ def prepare_archives(self) -> None:

for file_name in os.listdir(archives_fd):
if file_name.endswith('.index'):
logger.debug(f'opening archive: {file_name}')
try:
archives.append(self.add_archive(f'{archive_dir}/{file_name}'))
except pmapi.pmErr as ex:
Expand All @@ -140,45 +140,44 @@ def prepare_archives(self) -> None:
self.archives = archives

def send_meta(self) -> None:
metrics = []
metrics: JsonList = []
for metricinfo in self.metrics:
metrics.append({
'name': metricinfo.name,
'derive': metricinfo.derive,
})

meta = {
'timestamp': self.load_timestamp.tv_sec * 1000,
'interval': self.interval,
'source': 'pcp-archive',
'metrics': metrics
}
self.send_message(**meta)
self.send_json(source='pcp-archive', interval=self.interval, timestamp=self.start_timestamp, metrics=metrics)
self.need_meta = False

def sample_archives(self) -> None:
timestamp = self.load_timestamp
timestamp = self.start_timestamp

for i, archive in enumerate(self.archives):
# TODO can this be smarter?
# continue when curent archive isn't last and next archive starts before timestamp
if i != len(self.archives) - 1 and self.archives[i + 1].start.tv_sec < timestamp.tv_sec:
if i != len(self.archives) - 1 and self.archives[i + 1].start < timestamp:
continue

if timestamp.tv_sec < archive.start.tv_sec:
timestamp = archive.start
if timestamp < archive.start:
logging.debug("ligma balls")
timestamp = int(archive.start)

context = archive.context
context.pmSetMode(c_api.PM_MODE_INTERP | c_api.PM_XTB_SET(c_api.PM_TIME_MSEC), timestamp, self.interval)
logging.debug(f'timestamp:\t\t{timestamp}')
logging.debug(f'archive_start:\t{archive.start}')
logging.debug(f'archive_end:\t{context.pmGetArchiveEnd()}')
context.pmSetMode(c_api.PM_MODE_INTERP | c_api.PM_XTB_SET(c_api.PM_TIME_MSEC),
self.float_to_timeval(timestamp), self.interval)
self.sample(context)


def sample(self, current_context: pmapi.pmContext) -> None:
metrics = list(self.metrics)

pmids = current_context.pmLookupName([metric.name for metric in metrics])
descs = current_context.pmLookupDescs(pmids)

logging.debug('BEGIN SAMPLING')
while True:
fetched = []
try:
Expand All @@ -189,6 +188,7 @@ def sample(self, current_context: pmapi.pmContext) -> None:
self.send_updates(fetched)
fetched.clear()
except pmapi.pmErr as ex:
logging.debug(f'Fetching error: {ex}\t\tfetched: {fetched}')
if ex.errno() != c_api.PM_ERR_EOL:
raise ex
if len(fetched) > 0:
Expand All @@ -200,6 +200,7 @@ def parse_fetched_results(self, context: pmapi.pmContext, results: Any, descs: A
metrics = list(self.metrics)
samples = {}

samples['timestamp'] = float(results.contents.timestamp)
for i in range(results.contents.numpmid):
values: Union[dict, float] = defaultdict()
instances: Optional[List[str]] = None
Expand Down Expand Up @@ -227,43 +228,60 @@ def parse_fetched_results(self, context: pmapi.pmContext, results: Any, descs: A

return samples

def send_updates(self, samples: List[Dict[str, Union[float, List[float]]]]):
def calculate_sample_rate(self, value: float, old_value: Optional[float]) -> Union[float, bool]:
if old_value is not None and self.last_timestamp:
return (value - old_value) / (self.next_timestamp - self.last_timestamp)
else:
return False

def send_updates(self, samples: List[Dict]):
# data: List[List[Union[float, List[Optional[Union[float, bool]]]]]] = []
data: List[List[Union[float, List[float]]]] = []
last_samples = self.last_samples

for sample in samples:
self.next_timestamp = sample['timestamp']
sampled_values: List[Union[float, List[float]]] = []
for metricinfo in self.metrics:
value = sample[metricinfo.name]

if isinstance(value, dict):
# old_value = last_samples[metricinfo.name]
old_value = last_samples[metricinfo.name]
assert isinstance(value, dict)
if old_value == None:

Check notice

Code scanning / CodeQL

Testing equality to None Note

Testing for None should use the 'is' operator.
old_value = {}

# If we have less or more keys the data changed, send a meta message.
# if value.keys() != old_value.keys():
# self.need_meta = True
if value.keys() != old_value.keys():
self.need_meta = True

# if metricinfo.derive == 'rate':
# instances: List[Optional[Union[float, bool]]] = []
if metricinfo.derive == 'rate':
instances: List[Optional[Union[float, bool]]] = []

# for key, val in value.items():
# instances.append(self.calculate_sample_rate(val, old_value.get(key)))
for key, val in value.items():
instances.append(self.calculate_sample_rate(val, old_value.get(key)))

# sampled_values.append(instances)
# else:
# sampled_values.append(list(value.values()))
sampled_values.append(list(value.values()))
sampled_values.append(instances)
else:
sampled_values.append(list(value.values()))
else:
# if metricinfo.derive == 'rate':
# old_value = last_samples[metricinfo.name]
# data.append(self.calculate_sample_rate(value, old_value))
# else:
# data.append(value)
sampled_values.append(value)
old_value = last_samples.get(metricinfo.name)
assert isinstance(value, float)
# hack because I need some default value to initialize old_values in the first round of sampling
if not isinstance(old_value, float | None):
old_value = None

if metricinfo.derive == 'rate':
sampled_values.append(self.calculate_sample_rate(value, old_value))
else:
sampled_values.append(value)

data.append(sampled_values)
self.last_timestamp = self.next_timestamp
last_samples = sample

if self.need_meta:
self.send_meta()

self.last_samples = last_samples
self.send_data(json.dumps(data).encode())

0 comments on commit cf2b958

Please sign in to comment.