From 70bdda2add1c9b4f8ce83eb4e7d9dcd1a2945f89 Mon Sep 17 00:00:00 2001 From: gioelecerati <50955448+gioelecerati@users.noreply.github.com> Date: Thu, 21 Dec 2023 19:23:55 +0100 Subject: [PATCH] db: cache objects with ttl (#1997) * db: cache objects with ttl * api: Make sure cache objects are not shared * api/cache: Allow custom ttl for cache Also remove maxKeys which is bad (it throws an error, no LRU etc). * api/playback: Add cache to playback * api: Make cache gets built-in on db * api/playback: Cache object store for assets * api/playback: Fetch streams before assets We got huge traffic now, lets just switch their order. Later we can think of optimizing this better idk * api: Fix GetOptions default values useReplica was defaulting to false when the cache: true option was set. * api: Fix tests * api: make the cache into a class * api: Make sure to cache objectstore queries * api/db: Improve caching logic - Simplify getOrSet on read - Always write to cache even if not reading from cache - Update cache on writes (helps tests more than prod) - Make copies of objects when reading and writing to cache * api: Remove unnecessary cache flushes on tests * api/acl: Make sure all async flows have cache * api: Remove unused import * api/cache: Make cache TTL configurable --------- Co-authored-by: Victor Elias --- packages/api/src/app-router.ts | 5 ++ .../src/controllers/access-control.test.ts | 1 + .../api/src/controllers/access-control.ts | 41 +++++++---- packages/api/src/controllers/asset.ts | 2 +- packages/api/src/controllers/playback.test.ts | 2 + packages/api/src/controllers/playback.ts | 73 +++++++++++-------- packages/api/src/controllers/session.ts | 8 +- packages/api/src/controllers/stream.ts | 2 +- packages/api/src/middleware/auth.test.ts | 8 +- packages/api/src/middleware/auth.ts | 9 ++- packages/api/src/parse-cli.ts | 5 ++ packages/api/src/store/cache.ts | 59 +++++++++++++++ packages/api/src/store/table.ts | 36 ++++++++- packages/api/src/store/types.ts | 1 + 14 files changed, 195 insertions(+), 57 deletions(-) create mode 100644 packages/api/src/store/cache.ts diff --git a/packages/api/src/app-router.ts b/packages/api/src/app-router.ts index ca9e671b31..66c4352c4a 100644 --- a/packages/api/src/app-router.ts +++ b/packages/api/src/app-router.ts @@ -27,6 +27,7 @@ import { setupTus, setupTestTus } from "./controllers/asset"; import * as fcl from "@onflow/fcl"; import createFrontend from "@livepeer.studio/www"; import { NotFoundError } from "./store/errors"; +import { cache } from "./store/cache"; enum OrchestratorSource { hardcoded = "hardcoded", @@ -57,6 +58,7 @@ export default async function makeApp(params: CliArgs) { httpPrefix, postgresUrl, postgresReplicaUrl, + defaultCacheTtl, frontendDomain, supportAddr, sendgridTemplateId, @@ -98,6 +100,9 @@ export default async function makeApp(params: CliArgs) { postgresReplicaUrl, appName: ownRegion ? `${ownRegion}-api` : "api", }); + if (defaultCacheTtl > 0) { + cache.init({ stdTTL: defaultCacheTtl }); + } // RabbitMQ const queue: Queue = amqpUrl diff --git a/packages/api/src/controllers/access-control.test.ts b/packages/api/src/controllers/access-control.test.ts index 5b46a26c64..d799c11f84 100644 --- a/packages/api/src/controllers/access-control.test.ts +++ b/packages/api/src/controllers/access-control.test.ts @@ -195,6 +195,7 @@ describe("controllers/signing-key", () => { }); expect(res.status).toBe(204); await db.user.update(gatedAsset.userId, { suspended: true }); + const res2 = await client.post("/access-control/gate", { stream: `video+${gatedAsset.playbackId}`, type: "jwt", diff --git a/packages/api/src/controllers/access-control.ts b/packages/api/src/controllers/access-control.ts index 15f4617b18..73ba0ba450 100644 --- a/packages/api/src/controllers/access-control.ts +++ b/packages/api/src/controllers/access-control.ts @@ -20,9 +20,9 @@ import { fetchWithTimeoutAndRedirects } from "../util"; import fetch from "node-fetch"; import { WithID } from "../store/types"; import { DBStream } from "../store/stream-table"; -import { getViewers } from "./usage"; import { HACKER_DISABLE_CUTOFF_DATE } from "./utils/notification"; import { isFreeTierUser } from "./helpers"; +import { cache } from "../store/cache"; const WEBHOOK_TIMEOUT = 30 * 1000; const MAX_ALLOWED_VIEWERS_FOR_FREE_TIER = 5; @@ -102,9 +102,16 @@ app.post( validatePost("access-control-gate-payload"), async (req, res) => { const playbackId = req.body.stream.replace(/^\w+\+/, ""); - const content = - (await db.stream.getByPlaybackId(playbackId)) || - (await db.asset.getByPlaybackId(playbackId)); + + let content = await cache.getOrSet( + `acl-content-${playbackId}`, + async () => { + return ( + (await db.stream.getByPlaybackId(playbackId)) || + (await db.asset.getByPlaybackId(playbackId)) + ); + } + ); res.set("Cache-Control", "max-age=120,stale-while-revalidate=600"); @@ -116,7 +123,7 @@ app.post( throw new NotFoundError("Content not found"); } - const user = await db.user.get(content.userId); + let user = await db.user.get(content.userId, { useCache: true }); if (user.suspended || ("suspended" in content && content.suspended)) { const contentLog = JSON.stringify(JSON.stringify(content)); @@ -129,7 +136,7 @@ app.post( const playbackPolicyType = content.playbackPolicy?.type ?? "public"; if (user.createdAt < HACKER_DISABLE_CUTOFF_DATE) { - let limitReached = await freeTierLimitReached(content, user, req); + let limitReached = freeTierLimitReached(content, user, req); if (limitReached) { throw new ForbiddenError("Free tier user reached viewership limit"); } @@ -159,11 +166,15 @@ app.post( ); } - const query = []; - query.push(sql`signing_key.data->>'publicKey' = ${req.body.pub}`); - const [signingKeyOutput] = await db.signingKey.find(query, { - limit: 2, - }); + const [signingKeyOutput] = await cache.getOrSet( + `acl-signing-key-pub-${req.body.pub}`, + () => { + const query = [ + sql`signing_key.data->>'publicKey' = ${req.body.pub}`, + ]; + return db.signingKey.find(query, { limit: 2 }); + } + ); if (signingKeyOutput.length == 0) { console.log(` @@ -210,7 +221,9 @@ app.post( "Content is gated and requires an access key" ); } - const webhook = await db.webhook.get(content.playbackPolicy.webhookId); + const webhook = await db.webhook.get(content.playbackPolicy.webhookId, { + useCache: true, + }); if (!webhook) { console.log(` access-control: gate: content with playbackId=${playbackId} is gated but corresponding webhook not found for webhookId=${content.playbackPolicy.webhookId}, disallowing playback @@ -280,11 +293,11 @@ app.get("/public-key", async (req, res) => { }); }); -async function freeTierLimitReached( +function freeTierLimitReached( content: DBStream | WithID, user: User, req: Request -): Promise { +): boolean { if (!isFreeTierUser(user)) { return false; } diff --git a/packages/api/src/controllers/asset.ts b/packages/api/src/controllers/asset.ts index 7ecba295b8..162a7fd798 100644 --- a/packages/api/src/controllers/asset.ts +++ b/packages/api/src/controllers/asset.ts @@ -292,7 +292,7 @@ async function validateAssetPlaybackPolicy( } async function getActiveObjectStore(id: string) { - const os = await db.objectStore.get(id); + const os = await db.objectStore.get(id, { useCache: true }); if (!os || os.deleted || os.disabled) { throw new Error("Object store not found or disabled"); } diff --git a/packages/api/src/controllers/playback.test.ts b/packages/api/src/controllers/playback.test.ts index ee2a985ad5..f1b641d3b3 100644 --- a/packages/api/src/controllers/playback.test.ts +++ b/packages/api/src/controllers/playback.test.ts @@ -5,6 +5,7 @@ import { WithID } from "../store/types"; import { db } from "../store"; import { DBStream } from "../store/stream-table"; import { DBSession } from "../store/session-table"; +import { cache } from "../store/cache"; const EXPECTED_CROSS_USER_ASSETS_CUTOFF_DATE = Date.parse( "2023-06-06T00:00:00.000Z" @@ -525,6 +526,7 @@ describe("controllers/playback", () => { await db.asset.update(asset2.id, { createdAt: EXPECTED_CROSS_USER_ASSETS_CUTOFF_DATE - 1000, }); + cache.flush(); }); it("should return playback URL asset of user from CID", async () => { diff --git a/packages/api/src/controllers/playback.ts b/packages/api/src/controllers/playback.ts index ed72e1f834..0ecac3acb4 100644 --- a/packages/api/src/controllers/playback.ts +++ b/packages/api/src/controllers/playback.ts @@ -21,6 +21,7 @@ import { NotFoundError, UnprocessableEntityError } from "../store/errors"; import { isExperimentSubject } from "../store/experiment-table"; import logger from "../logger"; import { getRunningRecording } from "./session"; +import { cache } from "../store/cache"; /** * CROSS_USER_ASSETS_CUTOFF_DATE represents the cut-off date for cross-account @@ -115,7 +116,7 @@ const getAssetPlaybackInfo = async ( ingest: string, asset: WithID ) => { - const os = await db.objectStore.get(asset.objectStoreId); + const os = await db.objectStore.get(asset.objectStoreId, { useCache: true }); if (!os || os.deleted || os.disabled) { return null; } @@ -136,32 +137,17 @@ const getAssetPlaybackInfo = async ( ); }; +type PlaybackResource = { + stream?: DBStream; + session?: DBSession; + asset?: WithID; +}; + export async function getResourceByPlaybackId( id: string, user?: User, - cutoffDate?: number, - origin?: string -): Promise<{ stream?: DBStream; session?: DBSession; asset?: WithID }> { - let asset = - (await db.asset.getByPlaybackId(id)) ?? - (await db.asset.getByIpfsCid(id, user, cutoffDate)) ?? - (await db.asset.getBySourceURL("ipfs://" + id, user, cutoffDate)) ?? - (await db.asset.getBySourceURL("ar://" + id, user, cutoffDate)); - - if (asset && !asset.deleted) { - if (asset.status.phase !== "ready" && !asset.sourcePlaybackReady) { - throw new UnprocessableEntityError("asset is not ready for playback"); - } - if (asset.userId !== user?.id && cutoffDate) { - console.log( - `Returning cross-user asset for playback. ` + - `userId=${user?.id} userEmail=${user?.email} origin=${origin} ` + - `assetId=${asset.id} assetUserId=${asset.userId} playbackId=${asset.playbackId}` - ); - } - return { asset }; - } - + cutoffDate?: number +): Promise { let stream = await db.stream.getByPlaybackId(id); if (!stream) { const streamById = await db.stream.get(id); @@ -174,6 +160,16 @@ export async function getResourceByPlaybackId( return { stream }; } + const asset = + (await db.asset.getByPlaybackId(id)) ?? + (await db.asset.getByIpfsCid(id, user, cutoffDate)) ?? + (await db.asset.getBySourceURL("ipfs://" + id, user, cutoffDate)) ?? + (await db.asset.getBySourceURL("ar://" + id, user, cutoffDate)); + + if (asset && !asset.deleted) { + return { asset }; + } + const session = await db.session.get(id); if (session && !session.deleted) { return { session }; @@ -181,6 +177,7 @@ export async function getResourceByPlaybackId( return {}; } + async function getAttestationPlaybackInfo( config: CliArgs, ingest: string, @@ -224,14 +221,30 @@ async function getPlaybackInfo( withRecordings?: boolean ): Promise { const cutoffDate = isCrossUserQuery ? null : CROSS_USER_ASSETS_CUTOFF_DATE; - let { stream, asset, session } = await getResourceByPlaybackId( - id, - req.user, - cutoffDate, - origin - ); + const cacheKey = `playbackInfo-${id}-user-${req.user?.id}-cutoff-${cutoffDate}`; + let resource = cache.get(cacheKey); + if (!resource) { + resource = await getResourceByPlaybackId(id, req.user, cutoffDate); + + const ttl = + resource.asset && resource.asset.status.phase !== "ready" ? 5 : 120; + cache.set(cacheKey, resource, ttl); + } + + let { stream, asset, session } = resource; if (asset) { + if (asset.status.phase !== "ready" && !asset.sourcePlaybackReady) { + throw new UnprocessableEntityError("asset is not ready for playback"); + } + if (asset.userId !== req.user?.id && cutoffDate) { + console.log( + `Returning cross-user asset for playback. ` + + `userId=${req.user?.id} userEmail=${req.user?.email} origin=${origin} ` + + `assetId=${asset.id} assetUserId=${asset.userId} playbackId=${asset.playbackId}` + ); + } + return await getAssetPlaybackInfo(req.config, ingest, asset); } diff --git a/packages/api/src/controllers/session.ts b/packages/api/src/controllers/session.ts index f0e51184ba..b2073912e2 100644 --- a/packages/api/src/controllers/session.ts +++ b/packages/api/src/controllers/session.ts @@ -246,7 +246,9 @@ export async function buildRecordingUrl( recordCatalystObjectStoreId: string, secondaryRecordObjectStoreId: string ) { - const os = await db.objectStore.get(recordCatalystObjectStoreId); + const os = await db.objectStore.get(recordCatalystObjectStoreId, { + useCache: true, + }); let urlPrefix = pathJoin(os.publicUrl, session.playbackId, session.id); let manifestUrl = pathJoin(urlPrefix, "output.m3u8"); @@ -265,7 +267,9 @@ export async function buildRecordingUrl( }; } - const secondaryOs = await db.objectStore.get(secondaryRecordObjectStoreId); + const secondaryOs = await db.objectStore.get(secondaryRecordObjectStoreId, { + useCache: true, + }); urlPrefix = pathJoin(secondaryOs.publicUrl, session.playbackId, session.id); manifestUrl = pathJoin(urlPrefix, "output.m3u8"); diff --git a/packages/api/src/controllers/stream.ts b/packages/api/src/controllers/stream.ts index 9c2be47627..2b5750399c 100644 --- a/packages/api/src/controllers/stream.ts +++ b/packages/api/src/controllers/stream.ts @@ -407,7 +407,7 @@ export async function getRecordingPlaybackUrl( return null; } - const os = await db.objectStore.get(objectStoreId); + const os = await db.objectStore.get(objectStoreId, { useCache: true }); url = pathJoin(os.publicUrl, session.playbackId, session.id, "output.m3u8"); } catch (e) { console.log(` diff --git a/packages/api/src/middleware/auth.test.ts b/packages/api/src/middleware/auth.test.ts index 8560cb8ce1..23db58bccb 100644 --- a/packages/api/src/middleware/auth.test.ts +++ b/packages/api/src/middleware/auth.test.ts @@ -136,8 +136,10 @@ describe("auth middleware", () => { let nonAdminApiKey: string; let client: TestClient; - const setAccess = (token: string, rules?: ApiToken["access"]["rules"]) => - db.apiToken.update(token, { access: { rules } }); + const setAccess = async ( + token: string, + rules?: ApiToken["access"]["rules"] + ) => db.apiToken.update(token, { access: { rules } }); const fetchStatus = async (method: string, path: string) => { const res = await client.fetch(path, { method }); @@ -375,7 +377,7 @@ describe("auth middleware", () => { } = await setupUsers(server, mockAdminUserInput, mockNonAdminUserInput)); }); - const setAccess = (token: string, access?: ApiToken["access"]) => + const setAccess = async (token: string, access?: ApiToken["access"]) => db.apiToken.update(token, { access }); const expectResponse = (res: Response) => diff --git a/packages/api/src/middleware/auth.ts b/packages/api/src/middleware/auth.ts index a61370a497..495c1063c4 100644 --- a/packages/api/src/middleware/auth.ts +++ b/packages/api/src/middleware/auth.ts @@ -83,7 +83,7 @@ function authenticator(): RequestHandler { if (!tokenId) { throw new UnauthorizedError(`no authorization token provided`); } - tokenObject = await db.apiToken.get(tokenId); + tokenObject = await db.apiToken.get(tokenId, { useCache: true }); const matchesBasicUser = tokenObject?.userId === basicUser?.name; if (!tokenObject || (isBasic && !matchesBasicUser)) { throw new UnauthorizedError(`no token ${tokenId} found`); @@ -108,7 +108,8 @@ function authenticator(): RequestHandler { ); } - user = await db.user.get(userId); + user = await db.user.get(userId, { useCache: true }); + if (!user) { throw new UnauthorizedError( `no user found from authorization header: ${authHeader}` @@ -118,10 +119,12 @@ function authenticator(): RequestHandler { throw new ForbiddenError(`user is suspended`); } + req.token = tokenObject; req.user = user; + // UI admins must have a JWT req.isUIAdmin = user.admin && authScheme === "jwt"; - req.token = tokenObject; + return next(); }; } diff --git a/packages/api/src/parse-cli.ts b/packages/api/src/parse-cli.ts index c1779721e9..ea398c33e8 100755 --- a/packages/api/src/parse-cli.ts +++ b/packages/api/src/parse-cli.ts @@ -142,6 +142,11 @@ export default function parseCli(argv?: string | readonly string[]) { describe: "url of a postgres read replica database", type: "string", }, + "default-cache-ttl": { + describe: "default TTL for entries cached in memory, in seconds", + type: "number", + default: 120, + }, "amqp-url": { describe: "the RabbitMQ Url", type: "string", diff --git a/packages/api/src/store/cache.ts b/packages/api/src/store/cache.ts new file mode 100644 index 0000000000..bb1cde807e --- /dev/null +++ b/packages/api/src/store/cache.ts @@ -0,0 +1,59 @@ +import _ from "lodash"; +import NodeCache from "node-cache"; + +class Cache { + storage: NodeCache; + + init(options?: NodeCache.Options) { + if (this.storage) { + throw new Error("Cache already initialized"); + } + + this.storage = new NodeCache({ + stdTTL: 120, + checkperiod: 60, + ...options, + }); + } + + get(cacheKey: string) { + if (!this.storage) return; + + const content = this.storage.get(cacheKey) as T; + // always make copies in case caller mutates the object (yeah we still have that) + return content && _.cloneDeep(content); + } + + set(cacheKey: string, content: T, ttl?: string | number) { + if (!this.storage) return; + + content = _.cloneDeep(content); + this.storage.set(cacheKey, content, ttl); + } + + delete(cacheKey: string) { + if (!this.storage) return; + this.storage.del(cacheKey); + } + + async getOrSet( + cacheKey: string, + getter: () => Promise, + ttl?: string | number + ) { + let content = this.get(cacheKey); + if (!content) { + content = await getter(); + this.set(cacheKey, content, ttl); + } + return content; + } + + // Test helper to clear the cache + flush() { + if (!this.storage) return; + this.storage.flushAll(); + } +} + +export const cache = new Cache(); diff --git a/packages/api/src/store/table.ts b/packages/api/src/store/table.ts index 38f988bf72..06fceb91b8 100644 --- a/packages/api/src/store/table.ts +++ b/packages/api/src/store/table.ts @@ -14,6 +14,7 @@ import { DBLegacyObject, FieldSpec, } from "./types"; +import { cache } from "./cache"; const DEFAULT_SORT = "id ASC"; @@ -44,12 +45,26 @@ export default class Table { } // get a single document by id - async get(id: string, opts: GetOptions = { useReplica: true }): Promise { + async get( + id: string, + { useReplica = true, useCache }: GetOptions = {} + ): Promise { if (!id) { throw new Error("missing id"); + } else if (useCache && !useReplica) { + throw new Error("can't cache a non-replica query"); } + + const cacheKey = this.rowCacheKey(id); + if (useCache) { + const cached = cache.get(cacheKey); + if (cached) { + return cached as T; + } + } + let res: QueryResult; - if (!opts.useReplica) { + if (!useReplica) { res = await this.db.query( sql`SELECT data FROM ` .append(this.name) @@ -66,7 +81,12 @@ export default class Table { if (res.rowCount < 1) { return null; } - return res.rows[0].data as T; + + const data = res.rows[0].data as T; + // always cache on read, even if not returning from cache + cache.set(cacheKey, data); + + return data; } async getMany( @@ -196,6 +216,8 @@ export default class Table { if (res.rowCount < 1) { throw new NotFoundError(`${this.name} id=${doc.id} not found`); } + + cache.set(this.rowCacheKey(doc.id), doc); } async update( @@ -221,6 +243,10 @@ export default class Table { const res = await this.db.query(q); + if (typeof query === "string") { + cache.delete(this.rowCacheKey(query)); + } + if (res.rowCount < 1 && throwIfEmpty) { throw new NotFoundError(`${this.name} id=${doc.id} not found`); } @@ -396,4 +422,8 @@ export default class Table { } logger.info(`Created ${unique} index ${indexName} on ${this.name}`); } + + private rowCacheKey(id: string) { + return `db-get-${this.name}-by-id-${id}`; + } } diff --git a/packages/api/src/store/types.ts b/packages/api/src/store/types.ts index a6ba0ba9f9..8d5413dcd7 100644 --- a/packages/api/src/store/types.ts +++ b/packages/api/src/store/types.ts @@ -42,6 +42,7 @@ export interface FindOptions extends QueryOptions { export interface GetOptions { useReplica?: boolean; + useCache?: boolean; } export interface UpdateOptions {