Skip to content

Commit

Permalink
Implement limit option
Browse files Browse the repository at this point in the history
  • Loading branch information
jelly committed Feb 21, 2024
1 parent 188e92b commit 920768d
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 4 deletions.
17 changes: 15 additions & 2 deletions src/cockpit/channels/pcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def parse_options(self, options: JsonObject):
# if self.omit_instances and self.instances:
# raise ChannelError('protocol-error', message='')

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.

def sample(self, archive):
def sample(self, archive, total_fetched):
context = archive.context

# HACK: this is some utter sillyness, maybe we can construct our own pcp.pmapi.c_uint_Array_1
Expand All @@ -253,10 +253,16 @@ def sample(self, archive):
fetched = []
try:
for _ in range(self.archive_batch):
if total_fetched == self.limit:
self.send_updates(archive, fetched)
logger.debug('Reached limit, stopping')
return total_fetched
# Consider using the fetchGroup API https://pcp.readthedocs.io/en/latest/PG/PMAPI.html#fetchgroup-operation
# HACK: This is some pcp weirdness where it only accepts a PCP type list and not a Python list
# PMIDS <pcp.pmapi.c_uint_Array_1 object at 0x7ab92eaddb50>
results = context.pmFetch(pmids)
fetched.append(self.parse_fetched_results(context, results, descs))
total_fetched += 1

self.send_updates(archive, fetched)
fetched.clear()
Expand All @@ -270,6 +276,8 @@ def sample(self, archive):

break

return total_fetched

def parse_fetched_results(self, context: 'pmapi.pmContext', results: Any, descs: Any) -> Sample:
metrics = list(self.metrics)
samples: dict[str, float | list[float]] = {}
Expand Down Expand Up @@ -414,6 +422,7 @@ def send_updates(self, archive, samples: Sequence[Sample]) -> None:
self.send_data(json.dumps(data).encode())

def sample_archives(self, archives):
total_fetched = 0
for i, archive in enumerate(archives):
timestamp = self.start_timestamp

Expand All @@ -435,7 +444,9 @@ def sample_archives(self, archives):
except pmapi.pmErr as exc:
raise ChannelError('internal-error', message=str(exc)) from None

self.sample(archive)
total_fetched = self.sample(archive, total_fetched)
if total_fetched == self.limit:
return True
else:
return True

Expand Down Expand Up @@ -475,6 +486,8 @@ async def run(self, options: JsonObject) -> None:

self.sample_archives(archives)

self.done()

# while True:
#
# if all_read:
Expand Down
68 changes: 66 additions & 2 deletions test/pytest/test_pcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,21 @@ def test_broken_archive(tmpdir_factory):
return pcp_dir


@pytest.fixture
def big_archive(tmpdir_factory):
pcp_dir = tmpdir_factory.mktemp('big-archive')
archive_1 = pmi.pmiLogImport(f"{pcp_dir}/0")
archive_1.pmiAddMetric("mock.value", PM_ID_NULL, PM_TYPE_U32, PM_INDOM_NULL,
PM_SEM_INSTANT, archive_1.pmiUnits(0, 0, 0, 0, 0, 0))
for i in range(1000):
archive_1.pmiPutValue("mock.value", None, str(i))
archive_1.pmiWrite(i, 0)

archive_1.pmiEnd()

return pcp_dir


@pytest.fixture
def test_archive(tmpdir_factory):
pcp_dir = tmpdir_factory.mktemp('mock-archives')
Expand Down Expand Up @@ -102,8 +117,8 @@ async def test_pcp_open_error(transport, test_archive):

@pytest.mark.asyncio
async def test_pcp_open(transport, test_archive):
_ = await transport.check_open('metrics1', source=str(test_archive),
metrics=[{"name": "mock.value"}])
ch = await transport.check_open('metrics1', source=str(test_archive),
metrics=[{"name": "mock.value"}])

_, data = await transport.next_frame()
# first message is always the meta message
Expand Down Expand Up @@ -136,6 +151,55 @@ async def test_pcp_open(transport, test_archive):
data = json.loads(data)
assert data == [[13], [14], [15]]

transport.check_close(channel=ch)


@pytest.mark.asyncio
async def test_pcp_big_archive(transport, big_archive):
_ = await transport.check_open('metrics1', source=str(big_archive),
metrics=[{"name": "mock.value"}])

_, data = await transport.next_frame()
# first message is always the meta message
meta = json.loads(data)

# TODO: assert helper function?
assert meta['timestamp'] == 0
assert meta['interval'] == 1000 # default interval
assert meta['source'] == str(big_archive)

metrics = meta['metrics']
assert len(metrics) == 1

metric = metrics[0]
assert metric['name'] == 'mock.value'
assert 'derive' not in metric
assert metric['semantic'] == 'instant'

_, data = await transport.next_frame()
data = json.loads(data)
# archives batch size is hardcoded to 60
# TODO import?
assert data == [[i] for i in range(60)]


@pytest.mark.asyncio
async def test_pcp_limit_archive(transport, big_archive):

ch = await transport.check_open('metrics1', source=str(big_archive),
limit=30,
metrics=[{"name": "mock.value"}])

# first message is always the meta message
_, data = await transport.next_frame()
# TODO: verify

_, data = await transport.next_frame()
data = json.loads(data)
assert data == [[i] for i in range(30)]

transport.check_close(channel=ch)


@pytest.mark.asyncio
async def test_pcp_broken_archive(transport, test_broken_archive):
Expand Down

0 comments on commit 920768d

Please sign in to comment.