Skip to content

Commit

Permalink
api: Create logic for automatically cleaning up dead streams (#2013)
Browse files Browse the repository at this point in the history
* api: Create API for clean-up actually inactive streams

* api: Move the filtering logic out of activeCleanup

* api: Fix the active cleanup query

- We were missing the parentId check so it was reading way
more streams than necessary (all child streams/sessions).
- There are A LOT of forgotten streams in our DB (400k parents),
so we need to put some limit on this query

* api: Avoid using all DB resources for cleanup

* api: Fix tests (they were broken before!)

* Revert "api: Avoid using all DB resources for cleanup"

This reverts commit aff5120.

* api: Create separate DB conn pool for bg jobs

Use it on:
- Webhooks queue handler
- Tasks queue handler
- Usage billing job
- (new) Active cleanup job

* api: Fix tests

* api: Stop triggering isActive clean-up from GETs

Still monkey-patch responses to keep the same UX,
but the actual recordings processing will only be
triggered once the active clean-up job is triggered.

* api: Add tests for cleanup job

* api: Change default limit to 1000

It's a bit too expensive to do 10000 at once

* api: Fix override of isActive (breaking tests)

* api/stream: Clean-up lost sessions as well as parents

Turns out there are also children lost from their parents out
there, so we have to list them all.

* api/cannon: Make sure we clear unused sessions/streams objects

* api/cannon: Fix typing of handleRecordingWaitingChecks

* api: Make sure we always clear all isActive fields

* api/stream: Fix clean-up streams filtering logic

* .github: Create cronjob to clean-up inactive streams

* api: Clear streams that already have assets generated

* api/stream: Remove ordering from list query

Just makes things super slow and is not useful

* api/stream: Clean all child streams from same sess at once

* api/stream: Make size of clean-up more predictable

Don't filter on the outer level, but rather make sure
the pipeline supports an array of child streams.

* api/stream: Remove test for deduping by sessionId

Now we clean all of them, but we merge before the cleanup
processing only the session once. No easy way to catch that on
tests.

* api/stream: Stop using lodash

* api/stream: Run clean-up synchronously on API

Also parallelize parent streams

* api/stream: Default streamId to empty str

* api/stream: Parallelize child stream processing

* api/stream: Only use isActive field from streams

There are some bugged sessions that got isActive set to them

* Revert "api/stream: Only use isActive field from streams"

This reverts commit 8621089.

It was a staging only thing, likely from a development version.

* api/stream: Fallback to createdAt if lastSeen never set

* api/stream: Remove monkey-patching of stream objects 😍

* [DEV] Increase log level to debug in staging

* api/stream: Only trigger recording processing when stream goes offline

* api/stream: Check child streams shouldCleanupIsActive as well
  • Loading branch information
victorges committed May 6, 2024
1 parent 8eefa40 commit 722e01f
Show file tree
Hide file tree
Showing 23 changed files with 469 additions and 201 deletions.
File renamed without changes.
3 changes: 3 additions & 0 deletions .github/workflows/cron-streams-active-cleanup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ jobs:
- name: "staging"
hostname: "livepeer.monster"
api-token-secret: LP_STAGING_API_ADMIN_TOKEN
- name: production
hostname: "livepeer.studio"
api-token-secret: LP_API_ADMIN_TOKEN
runs-on: ubuntu-latest
steps:
- name: Clean-up ${{ matrix.env.name }} active streams
Expand Down
16 changes: 10 additions & 6 deletions packages/api/src/app-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ const PROM_BUNDLE_OPTS: promBundle.Opts = {
},
};

export default async function makeApp(params: CliArgs) {
export default async function appRouter(params: CliArgs) {
const {
httpPrefix,
postgresUrl,
postgresReplicaUrl,
postgresConnPoolSize: pgPoolSize,
postgresJobsConnPoolSize: pgJobsPoolSize,
defaultCacheTtl,
frontendDomain,
supportAddr,
Expand Down Expand Up @@ -95,11 +97,12 @@ export default async function makeApp(params: CliArgs) {

// Storage init
const bodyParser = require("body-parser");
const [db, store] = await makeStore({
postgresUrl,
postgresReplicaUrl,
appName: ownRegion ? `${ownRegion}-api` : "api",
});
const appName = ownRegion ? `${ownRegion}-api` : "api";
const pgBaseParams = { postgresUrl, postgresReplicaUrl, appName };
const [db, jobsDb, store] = await makeStore(
{ ...pgBaseParams, poolMaxSize: pgPoolSize },
{ ...pgBaseParams, poolMaxSize: pgJobsPoolSize, appName: `${appName}-jobs` }
);
if (defaultCacheTtl > 0) {
cache.init({ stdTTL: defaultCacheTtl });
}
Expand Down Expand Up @@ -285,6 +288,7 @@ export default async function makeApp(params: CliArgs) {
taskScheduler,
store,
db,
jobsDb,
queue,
};
}
168 changes: 161 additions & 7 deletions packages/api/src/controllers/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,28 @@ import { json as bodyParserJson } from "body-parser";
import { v4 as uuid } from "uuid";

import {
ObjectStore,
MultistreamTarget,
ObjectStore,
Stream,
StreamHealthPayload,
StreamPatchPayload,
User,
StreamSetActivePayload,
StreamHealthPayload,
User,
} from "../schema/types";
import { db } from "../store";
import { DBStream } from "../store/stream-table";
import { DBWebhook } from "../store/webhook-table";
import {
AuxTestServer,
TestClient,
clearDatabase,
startAuxTestServer,
setupUsers,
AuxTestServer,
startAuxTestServer,
} from "../test-helpers";
import serverPromise, { TestServer } from "../test-server";
import { semaphore, sleep } from "../util";
import { generateUniquePlaybackId } from "./generate-keys";
import { extractUrlFrom, extractRegionFrom } from "./stream";
import { ACTIVE_TIMEOUT, extractRegionFrom, extractUrlFrom } from "./stream";

const uuidRegex = /[0-9a-f]+(-[0-9a-f]+){4}/;

Expand Down Expand Up @@ -1288,7 +1288,7 @@ describe("controllers/stream", () => {
});
});

describe("webhooks", () => {
describe("incoming hooks", () => {
let stream: Stream;
let data;
let res;
Expand Down Expand Up @@ -1658,6 +1658,160 @@ describe("controllers/stream", () => {
});
});

describe("active clean-up", () => {
let webhookServer: AuxTestServer;
let hookSem: ReturnType<typeof semaphore>;
let hookPayload: any;

beforeAll(async () => {
webhookServer = await startAuxTestServer();
webhookServer.app.use(bodyParserJson());
webhookServer.app.post("/captain/hook", bodyParserJson(), (req, res) => {
hookPayload = req.body;
hookSem.release();
res.status(204).end();
});
});

afterAll(() => webhookServer.close());

beforeEach(async () => {
hookPayload = undefined;
hookSem = semaphore();

client.jwtAuth = nonAdminToken;
await client.post("/webhook", {
name: "stream-idle-hook",
events: ["stream.idle"],
url: `http://localhost:${webhookServer.port}/captain/hook`,
});
});

const waitHookCalled = async () => {
await hookSem.wait(3000);
expect(hookPayload).toBeDefined();
hookSem = semaphore();
};

it("should not clean streams that are still active", async () => {
let res = await client.post("/stream", { ...postMockStream });
expect(res.status).toBe(201);
const stream = await res.json();
await db.stream.update(stream.id, {
isActive: true,
lastSeen: Date.now() - ACTIVE_TIMEOUT / 2,
});

client.jwtAuth = adminToken;
res = await client.post(`/stream/job/active-cleanup`);
expect(res.status).toBe(200);
const { cleanedUp } = await res.json();
expect(cleanedUp).toHaveLength(0);
});

it("should clean streams that are active but lost", async () => {
let res = await client.post("/stream", { ...postMockStream });
expect(res.status).toBe(201);
const stream = await res.json();
await db.stream.update(stream.id, {
isActive: true,
lastSeen: Date.now() - ACTIVE_TIMEOUT - 1,
});

client.jwtAuth = adminToken;
res = await client.post(`/stream/job/active-cleanup`);
expect(res.status).toBe(200);
const { cleanedUp } = await res.json();
expect(cleanedUp).toHaveLength(1);
expect(cleanedUp[0].id).toEqual(stream.id);

await waitHookCalled();

const updatedStream = await db.stream.get(stream.id);
expect(updatedStream.isActive).toBe(false);
});

it("should clean multiple streams at once, respecting limit", async () => {
for (let i = 0; i < 3; i++) {
let res = await client.post("/stream", { ...postMockStream });
expect(res.status).toBe(201);
const stream = await res.json();
await db.stream.update(stream.id, {
isActive: true,
lastSeen: Date.now() - ACTIVE_TIMEOUT - 1,
});
}

client.jwtAuth = adminToken;
const res = await client.post(`/stream/job/active-cleanup?limit=2`);
expect(res.status).toBe(200);
const { cleanedUp } = await res.json();
expect(cleanedUp).toHaveLength(2);
});

it("should clean lost sessions whose parents are not active", async () => {
let res = await client.post("/stream", { ...postMockStream });
expect(res.status).toBe(201);
let stream: Stream = await res.json();

const sessionId = uuid();
res = await client.post(
`/stream/${stream.id}/stream?sessionId=${sessionId}`,
{
name: `video+${stream.playbackId}`,
}
);
expect(res.status).toBe(201);
const child: Stream = await res.json();

await db.stream.update(child.id, {
isActive: true,
lastSeen: Date.now() - ACTIVE_TIMEOUT - 1,
});
stream = await db.stream.get(stream.id);
expect(stream.isActive).toBe(false);

client.jwtAuth = adminToken;
res = await client.post(`/stream/job/active-cleanup`);
expect(res.status).toBe(200);
const { cleanedUp } = await res.json();
expect(cleanedUp).toHaveLength(1);
expect(cleanedUp[0].id).toEqual(child.id);
});

it("cleans only the parent if both parent and child are lost", async () => {
let res = await client.post("/stream", { ...postMockStream });
expect(res.status).toBe(201);
let stream: Stream = await res.json();

const sessionId = uuid();
res = await client.post(
`/stream/${stream.id}/stream?sessionId=${sessionId}`,
{
name: `video+${stream.playbackId}`,
}
);
expect(res.status).toBe(201);
const child: Stream = await res.json();

await db.stream.update(child.id, {
isActive: true,
lastSeen: Date.now() - ACTIVE_TIMEOUT - 1,
});
await db.stream.update(stream.id, {
isActive: true,
lastSeen: Date.now() - ACTIVE_TIMEOUT - 1,
});

client.jwtAuth = adminToken;
res = await client.post(`/stream/job/active-cleanup`);
expect(res.status).toBe(200);
const { cleanedUp } = await res.json();
expect(cleanedUp).toHaveLength(1);
expect(cleanedUp[0].id).toEqual(stream.id);
});
});

describe("profiles", () => {
let stream: Stream;
let fractionalStream: Stream;
Expand Down

0 comments on commit 722e01f

Please sign in to comment.