Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: Create logic for automatically cleaning up dead streams #2013

Merged
merged 34 commits into from May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
33d5601
api: Create API for clean-up actually inactive streams
victorges Jan 17, 2024
e1783ad
api: Move the filtering logic out of activeCleanup
victorges Jan 17, 2024
0822fac
api: Fix the active cleanup query
victorges Jan 17, 2024
ef6df93
api: Avoid using all DB resources for cleanup
victorges Jan 17, 2024
672fd27
api: Fix tests (they were broken before!)
victorges Jan 17, 2024
aefeb54
Revert "api: Avoid using all DB resources for cleanup"
victorges Apr 25, 2024
46c6ccd
api: Create separate DB conn pool for bg jobs
victorges Apr 25, 2024
7bc71cc
api: Fix tests
victorges Apr 26, 2024
1d1e433
api: Stop triggering isActive clean-up from GETs
victorges Apr 26, 2024
181f9b5
api: Add tests for cleanup job
victorges Apr 26, 2024
d60370e
api: Change default limit to 1000
victorges Apr 26, 2024
4af36a4
api: Fix override of isActive (breaking tests)
victorges Apr 26, 2024
a2423ea
api/stream: Clean-up lost sessions as well as parents
victorges May 3, 2024
68770e5
api/cannon: Make sure we clear unused sessions/streams objects
victorges May 3, 2024
3597feb
api/cannon: Fix typing of handleRecordingWaitingChecks
victorges May 3, 2024
8034798
api: Make sure we always clear all isActive fields
victorges May 3, 2024
7f370ba
api/stream: Fix clean-up streams filtering logic
victorges May 3, 2024
0f517b7
.github: Create cronjob to clean-up inactive streams
victorges Jan 17, 2024
fb3b9ba
api: Clear streams that already have assets generated
victorges May 3, 2024
c82362b
api/stream: Remove ordering from list query
victorges May 3, 2024
02dfce7
api/stream: Clean all child streams from same sess at once
victorges May 4, 2024
85967f8
api/stream: Make size of clean-up more predictable
victorges May 4, 2024
ffe8729
api/stream: Remove test for deduping by sessionId
victorges May 4, 2024
b953e65
api/stream: Stop using lodash
victorges May 4, 2024
0c2fb67
api/stream: Run clean-up synchronously on API
victorges May 4, 2024
d2d7fbe
api/stream: Default streamId to empty str
victorges May 4, 2024
ceea027
api/stream: Parallelize child stream processing
victorges May 4, 2024
8621089
api/stream: Only use isActive field from streams
victorges May 5, 2024
5724c14
Revert "api/stream: Only use isActive field from streams"
victorges May 6, 2024
1178457
api/stream: Fallback to createdAt if lastSeen never set
victorges May 6, 2024
269e235
api/stream: Remove monkey-patching of stream objects 😍
victorges May 6, 2024
4ba4937
[DEV] Increase log level to debug in staging
victorges May 6, 2024
ce4d436
api/stream: Only trigger recording processing when stream goes offline
victorges May 6, 2024
63d39f7
api/stream: Check child streams shouldCleanupIsActive as well
victorges May 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
File renamed without changes.
3 changes: 3 additions & 0 deletions .github/workflows/cron-streams-active-cleanup.yaml
Expand Up @@ -12,6 +12,9 @@ jobs:
- name: "staging"
hostname: "livepeer.monster"
api-token-secret: LP_STAGING_API_ADMIN_TOKEN
- name: production
victorges marked this conversation as resolved.
Show resolved Hide resolved
hostname: "livepeer.studio"
api-token-secret: LP_API_ADMIN_TOKEN
runs-on: ubuntu-latest
steps:
- name: Clean-up ${{ matrix.env.name }} active streams
Expand Down
16 changes: 10 additions & 6 deletions packages/api/src/app-router.ts
Expand Up @@ -53,11 +53,13 @@ const PROM_BUNDLE_OPTS: promBundle.Opts = {
},
};

export default async function makeApp(params: CliArgs) {
export default async function appRouter(params: CliArgs) {
const {
httpPrefix,
postgresUrl,
postgresReplicaUrl,
postgresConnPoolSize: pgPoolSize,
postgresJobsConnPoolSize: pgJobsPoolSize,
defaultCacheTtl,
frontendDomain,
supportAddr,
Expand Down Expand Up @@ -95,11 +97,12 @@ export default async function makeApp(params: CliArgs) {

// Storage init
const bodyParser = require("body-parser");
const [db, store] = await makeStore({
postgresUrl,
postgresReplicaUrl,
appName: ownRegion ? `${ownRegion}-api` : "api",
});
const appName = ownRegion ? `${ownRegion}-api` : "api";
const pgBaseParams = { postgresUrl, postgresReplicaUrl, appName };
const [db, jobsDb, store] = await makeStore(
{ ...pgBaseParams, poolMaxSize: pgPoolSize },
{ ...pgBaseParams, poolMaxSize: pgJobsPoolSize, appName: `${appName}-jobs` }
);
if (defaultCacheTtl > 0) {
cache.init({ stdTTL: defaultCacheTtl });
}
Expand Down Expand Up @@ -285,6 +288,7 @@ export default async function makeApp(params: CliArgs) {
taskScheduler,
store,
db,
jobsDb,
queue,
};
}
168 changes: 161 additions & 7 deletions packages/api/src/controllers/stream.test.ts
Expand Up @@ -2,28 +2,28 @@ import { json as bodyParserJson } from "body-parser";
import { v4 as uuid } from "uuid";

import {
ObjectStore,
MultistreamTarget,
ObjectStore,
Stream,
StreamHealthPayload,
StreamPatchPayload,
User,
StreamSetActivePayload,
StreamHealthPayload,
User,
} from "../schema/types";
import { db } from "../store";
import { DBStream } from "../store/stream-table";
import { DBWebhook } from "../store/webhook-table";
import {
AuxTestServer,
TestClient,
clearDatabase,
startAuxTestServer,
setupUsers,
AuxTestServer,
startAuxTestServer,
} from "../test-helpers";
import serverPromise, { TestServer } from "../test-server";
import { semaphore, sleep } from "../util";
import { generateUniquePlaybackId } from "./generate-keys";
import { extractUrlFrom, extractRegionFrom } from "./stream";
import { ACTIVE_TIMEOUT, extractRegionFrom, extractUrlFrom } from "./stream";

const uuidRegex = /[0-9a-f]+(-[0-9a-f]+){4}/;

Expand Down Expand Up @@ -1288,7 +1288,7 @@ describe("controllers/stream", () => {
});
});

describe("webhooks", () => {
describe("incoming hooks", () => {
let stream: Stream;
let data;
let res;
Expand Down Expand Up @@ -1658,6 +1658,160 @@ describe("controllers/stream", () => {
});
});

describe("active clean-up", () => {
let webhookServer: AuxTestServer;
let hookSem: ReturnType<typeof semaphore>;
let hookPayload: any;

beforeAll(async () => {
webhookServer = await startAuxTestServer();
webhookServer.app.use(bodyParserJson());
webhookServer.app.post("/captain/hook", bodyParserJson(), (req, res) => {
hookPayload = req.body;
hookSem.release();
res.status(204).end();
});
});

afterAll(() => webhookServer.close());

beforeEach(async () => {
hookPayload = undefined;
hookSem = semaphore();

client.jwtAuth = nonAdminToken;
await client.post("/webhook", {
name: "stream-idle-hook",
events: ["stream.idle"],
url: `http://localhost:${webhookServer.port}/captain/hook`,
});
});

const waitHookCalled = async () => {
await hookSem.wait(3000);
expect(hookPayload).toBeDefined();
hookSem = semaphore();
};

it("should not clean streams that are still active", async () => {
let res = await client.post("/stream", { ...postMockStream });
expect(res.status).toBe(201);
const stream = await res.json();
await db.stream.update(stream.id, {
isActive: true,
lastSeen: Date.now() - ACTIVE_TIMEOUT / 2,
});

client.jwtAuth = adminToken;
res = await client.post(`/stream/job/active-cleanup`);
expect(res.status).toBe(200);
const { cleanedUp } = await res.json();
expect(cleanedUp).toHaveLength(0);
});

it("should clean streams that are active but lost", async () => {
let res = await client.post("/stream", { ...postMockStream });
expect(res.status).toBe(201);
const stream = await res.json();
await db.stream.update(stream.id, {
isActive: true,
lastSeen: Date.now() - ACTIVE_TIMEOUT - 1,
});

client.jwtAuth = adminToken;
res = await client.post(`/stream/job/active-cleanup`);
expect(res.status).toBe(200);
const { cleanedUp } = await res.json();
expect(cleanedUp).toHaveLength(1);
expect(cleanedUp[0].id).toEqual(stream.id);

await waitHookCalled();

const updatedStream = await db.stream.get(stream.id);
expect(updatedStream.isActive).toBe(false);
});

it("should clean multiple streams at once, respecting limit", async () => {
for (let i = 0; i < 3; i++) {
let res = await client.post("/stream", { ...postMockStream });
expect(res.status).toBe(201);
const stream = await res.json();
await db.stream.update(stream.id, {
isActive: true,
lastSeen: Date.now() - ACTIVE_TIMEOUT - 1,
});
}

client.jwtAuth = adminToken;
const res = await client.post(`/stream/job/active-cleanup?limit=2`);
expect(res.status).toBe(200);
const { cleanedUp } = await res.json();
expect(cleanedUp).toHaveLength(2);
});

it("should clean lost sessions whose parents are not active", async () => {
let res = await client.post("/stream", { ...postMockStream });
expect(res.status).toBe(201);
let stream: Stream = await res.json();

const sessionId = uuid();
res = await client.post(
`/stream/${stream.id}/stream?sessionId=${sessionId}`,
{
name: `video+${stream.playbackId}`,
}
);
expect(res.status).toBe(201);
const child: Stream = await res.json();

await db.stream.update(child.id, {
isActive: true,
lastSeen: Date.now() - ACTIVE_TIMEOUT - 1,
});
stream = await db.stream.get(stream.id);
expect(stream.isActive).toBe(false);

client.jwtAuth = adminToken;
res = await client.post(`/stream/job/active-cleanup`);
expect(res.status).toBe(200);
const { cleanedUp } = await res.json();
expect(cleanedUp).toHaveLength(1);
expect(cleanedUp[0].id).toEqual(child.id);
});

it("cleans only the parent if both parent and child are lost", async () => {
let res = await client.post("/stream", { ...postMockStream });
expect(res.status).toBe(201);
let stream: Stream = await res.json();

const sessionId = uuid();
res = await client.post(
`/stream/${stream.id}/stream?sessionId=${sessionId}`,
{
name: `video+${stream.playbackId}`,
}
);
expect(res.status).toBe(201);
const child: Stream = await res.json();

await db.stream.update(child.id, {
isActive: true,
lastSeen: Date.now() - ACTIVE_TIMEOUT - 1,
});
await db.stream.update(stream.id, {
isActive: true,
lastSeen: Date.now() - ACTIVE_TIMEOUT - 1,
});

client.jwtAuth = adminToken;
res = await client.post(`/stream/job/active-cleanup`);
expect(res.status).toBe(200);
const { cleanedUp } = await res.json();
expect(cleanedUp).toHaveLength(1);
expect(cleanedUp[0].id).toEqual(stream.id);
});
});

describe("profiles", () => {
let stream: Stream;
let fractionalStream: Stream;
Expand Down