Skip to content

Commit

Permalink
stream/session: add projectId scoping
Browse files Browse the repository at this point in the history
  • Loading branch information
emranemran committed Mar 18, 2024
1 parent bf4d640 commit dbb53cc
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 23 deletions.
9 changes: 7 additions & 2 deletions packages/api/src/controllers/session.ts
Expand Up @@ -2,7 +2,7 @@ import { Router, Request } from "express";
import sql from "sql-template-strings";

import { authorizer } from "../middleware";
import { User } from "../schema/types";
import { User, Project } from "../schema/types";
import { db } from "../store";
import { DBSession } from "../store/session-table";
import { pathJoin } from "../controllers/helpers";
Expand Down Expand Up @@ -36,6 +36,7 @@ const fieldsMap: FieldsMap = {
userId: `session.data->>'userId'`,
"user.email": { val: `users.data->>'email'`, type: "full-text" },
parentId: `session.data->>'parentId'`,
projectId: `session.data->>'projectId'`,
record: { val: `session.data->'record'`, type: "boolean" },
sourceSegments: `session.data->'sourceSegments'`,
transcodedSegments: {
Expand Down Expand Up @@ -78,6 +79,9 @@ app.get("/", authorizer({}), async (req, res, next) => {
if (parentId) {
query.push(sql`session.data->>'parentId' = ${parentId}`);
}
query.push(
sql`coalesce(session.data->>'projectId', '') = ${req.project?.id || ""}`
);

if (!order) {
order = "lastSeen-true,createdAt-true";
Expand Down Expand Up @@ -148,7 +152,8 @@ app.get("/:id", authorizer({}), async (req, res) => {
!session ||
((session.userId !== req.user.id || session.deleted) &&
!req.user.admin &&
!LVPR_SDK_EMAILS.includes(req.user.email))
!LVPR_SDK_EMAILS.includes(req.user.email)) ||
session.projectId !== req.project?.id
) {
// do not reveal that session exists
res.status(404);
Expand Down
78 changes: 57 additions & 21 deletions packages/api/src/controllers/stream.ts
Expand Up @@ -17,6 +17,7 @@ import {
StreamPatchPayload,
StreamSetActivePayload,
User,
Project,
} from "../schema/types";
import { db } from "../store";
import { DBSession } from "../store/session-table";
Expand Down Expand Up @@ -332,6 +333,7 @@ const fieldsMap: FieldsMap = {
"user.email": { val: `users.data->>'email'`, type: "full-text" },
parentId: `stream.data->>'parentId'`,
playbackId: `stream.data->>'playbackId'`,
projectId: `stream.data->>'projectId'`,
record: { val: `stream.data->'record'`, type: "boolean" },
suspended: { val: `stream.data->'suspended'`, type: "boolean" },
sourceSegmentsDuration: {
Expand Down Expand Up @@ -388,6 +390,9 @@ app.get("/", authorizer({}), async (req, res) => {
if (userId) {
query.push(sql`stream.data->>'userId' = ${userId}`);
}
query.push(
sql`coalesce(stream.data->>'projectId', '') = ${req.project?.id || ""}`
);

if (!order) {
order = "lastSeen-true,createdAt-true";
Expand Down Expand Up @@ -554,6 +559,7 @@ app.get("/:parentId/sessions", authorizer({}), async (req, res) => {
query.push(sql`(data->'lastSeen')::bigint > 0`);
query.push(sql`(data->'sourceSegmentsDuration')::bigint > 0`);
query.push(sql`data->>'partialSession' IS NULL`);
query.push(sql`coalesce(data->>'projectId', '') = ${req.project?.id || ""}`);
if (record) {
if (record === "true" || record === "1") {
query.push(sql`data->>'record' = 'true'`);
Expand Down Expand Up @@ -626,7 +632,8 @@ app.get("/sessions/:parentId", authorizer({}), async (req, res) => {
if (
!stream ||
stream.deleted ||
(stream.userId !== req.user.id && !req.isUIAdmin)
(stream.userId !== req.user.id && !req.isUIAdmin) ||
stream.projectId !== req.project?.id
) {
res.status(404);
return res.json({ errors: ["not found"] });
Expand Down Expand Up @@ -669,6 +676,7 @@ app.get("/user/:userId", authorizer({}), async (req, res) => {
} else if (sessionsonly) {
query.push(sql`data->>'parentId' IS NOT NULL`);
}
query.push(sql`coalesce(data->>'projectId', '') = ${req.project?.id || ""}`);

const [streams, newCursor] = await db.stream.find(query, {
cursor,
Expand Down Expand Up @@ -700,7 +708,8 @@ app.get("/:id", authorizer({}), async (req, res) => {
let stream = await db.stream.get(req.params.id);
if (
!stream ||
((stream.userId !== req.user.id || stream.deleted) && !req.user.admin)
((stream.userId !== req.user.id || stream.deleted) && !req.user.admin) ||
stream.projectId !== req.project?.id
) {
// do not reveal that stream exists
res.status(404);
Expand Down Expand Up @@ -743,7 +752,8 @@ app.get("/playback/:playbackId", authorizer({}), async (req, res) => {
});
if (
!stream ||
((stream.userId !== req.user.id || stream.deleted) && !req.user.admin)
((stream.userId !== req.user.id || stream.deleted) && !req.user.admin) ||
stream.projectId !== req.project?.id
) {
res.status(404);
return res.json({ errors: ["not found"] });
Expand All @@ -765,7 +775,8 @@ app.get("/key/:streamKey", authorizer({}), async (req, res) => {
);
if (
!docs.length ||
((docs[0].userId !== req.user.id || docs[0].deleted) && !req.user.admin)
((docs[0].userId !== req.user.id || docs[0].deleted) && !req.user.admin) ||
docs[0].projectId !== req.project.id
) {
res.status(404);
return res.json({ errors: ["not found"] });
Expand Down Expand Up @@ -822,7 +833,8 @@ app.post(
if (
!stream ||
((stream.userId !== req.user.id || stream.deleted) &&
!(req.user.admin && !stream.deleted))
!(req.user.admin && !stream.deleted)) ||
stream.projectId !== req.project?.id
) {
// do not reveal that stream exists
res.status(404);
Expand All @@ -844,6 +856,7 @@ app.post(
...req.body,
kind: "stream",
userId: stream.userId,
projectId: stream.projectId,
renditions: {},
objectStoreId: stream.objectStoreId,
record,
Expand All @@ -863,14 +876,15 @@ app.post(

if (await db.session.get(sessionId)) {
logger.info(
`user session re-used for session.id=${sessionId} stream.id=${stream.id} stream.name='${stream.name}' playbackid=${stream.playbackId}`
`user session re-used for session.id=${sessionId} stream.id=${stream.id} stream.name='${stream.name}' playbackid=${stream.playbackId} stream.projectId=${stream.projectId}`
);
} else {
const session: DBSession = {
id: sessionId,
parentId: stream.id,
playbackId: stream.playbackId,
userId: stream.userId,
projectId: stream.projectId,
kind: "session",
version: "v2",
name: req.body.name,
Expand Down Expand Up @@ -911,9 +925,9 @@ app.post(
logger.info(
`stream session created for stream_id=${stream.id} stream_name='${
stream.name
}' playbackid=${stream.playbackId} session_id=${id} elapsed=${
Date.now() - start
}ms`
}' playbackid=${stream.playbackId} session_id=${id} projectid=${
stream.projectId
} elapsed=${Date.now() - start}ms`
);
}
);
Expand Down Expand Up @@ -1031,6 +1045,7 @@ app.put(
[
sql`data->>'userId' = ${req.user.id}`,
sql`data->>'deleted' IS NULL`,
sql`coalesce(data->>'projectId', '') = ${req.project?.id || ""}`,
...filters,
],
{ useReplica: false }
Expand Down Expand Up @@ -1101,6 +1116,7 @@ app.post(
sql`data->>'userId' = ${req.user.id}`,
sql`data->>'deleted' IS NULL`,
sql`data->'pull'->>'source' = ${payload.pull.source}`,
sql`coalesce(data->>'projectId', '') = ${req.project?.id || ""}`,
],
{ useReplica: false }
);
Expand Down Expand Up @@ -1176,6 +1192,7 @@ async function handleCreateStream(req: Request) {
renditions: {},
objectStoreId,
id,
projectId: req.project?.id ?? "",
createdAt,
streamKey,
playbackId,
Expand Down Expand Up @@ -1207,7 +1224,11 @@ app.post("/:id/heartbeat", authorizer({ anyAdmin: true }), async (req, res) => {
logger.info(`got /heartbeat for stream=${id}`);

const stream = await db.stream.get(id, { useReplica: false });
if (!stream || (stream.deleted && !req.user.admin)) {
if (
!stream ||
(stream.deleted && !req.user.admin) ||
stream.projectId !== req.project?.id
) {
res.status(404);
return res.json({ errors: ["not found"] });
}
Expand All @@ -1227,7 +1248,11 @@ app.put(
);

const stream = await db.stream.get(id, { useReplica: false });
if (!stream || (stream.deleted && !req.user.admin)) {
if (
!stream ||
(stream.deleted && !req.user.admin) ||
stream.projectId !== req.project?.id
) {
res.status(404);
return res.json({ errors: ["not found"] });
}
Expand Down Expand Up @@ -1536,7 +1561,7 @@ app.post(

const stream = await db.stream.get(req.params.id);

if (!stream || stream.deleted) {
if (!stream || stream.deleted || stream.projectId !== req.project?.id) {
res.status(404);
return res.json({ errors: ["stream not found"] });
}
Expand Down Expand Up @@ -1625,7 +1650,7 @@ app.patch(

const exists = stream && !stream.deleted;
const hasAccess = stream?.userId === req.user.id || req.isUIAdmin;
if (!exists || !hasAccess) {
if (!exists || !hasAccess || stream.projectId !== req.project?.id) {
res.status(404);
return res.json({ errors: ["not found"] });
}
Expand Down Expand Up @@ -1704,7 +1729,7 @@ app.patch(
app.patch("/:id/record", authorizer({}), async (req, res) => {
const { id } = req.params;
const stream = await db.stream.get(id);
if (!stream || stream.deleted) {
if (!stream || stream.deleted || stream.projectId !== req.project?.id) {
res.status(404);
return res.json({ errors: ["not found"] });
}
Expand Down Expand Up @@ -1737,7 +1762,8 @@ app.delete("/:id", authorizer({}), async (req, res) => {
if (
!stream ||
stream.deleted ||
(stream.userId !== req.user.id && !req.user.admin)
(stream.userId !== req.user.id && !req.user.admin) ||
stream.projectId !== req.project?.id
) {
res.status(404);
return res.json({ errors: ["not found"] });
Expand Down Expand Up @@ -1767,7 +1793,8 @@ app.delete("/", authorizer({}), async (req, res) => {
const streams = await db.stream.getMany(ids);
if (
streams.length !== ids.length ||
streams.some((s) => s.userId !== req.user.id)
streams.some((s) => s.userId !== req.user.id) ||
streams.some((s) => s.projectId !== req.project.id)
) {
res.status(404);
return res.json({ errors: ["not found"] });
Expand Down Expand Up @@ -1800,7 +1827,8 @@ app.get("/:id/info", authorizer({}), async (req, res) => {
}
if (
!stream ||
(!req.user.admin && (stream.deleted || stream.userId !== req.user.id))
(!req.user.admin && (stream.deleted || stream.userId !== req.user.id)) ||
stream.projectId !== req.project?.id
) {
res.status(404);
return res.json({
Expand Down Expand Up @@ -1836,7 +1864,12 @@ app.get("/:id/config", authorizer({ anyAdmin: true }), async (req, res) => {
let stream = await db.stream.getByPlaybackId(id, {
useReplica: false,
});
if (!stream || stream.deleted || stream.suspended) {
if (
!stream ||
stream.deleted ||
stream.suspended ||
stream.projectId !== req.project?.id
) {
res.status(404);
return res.json({
errors: ["not found"],
Expand Down Expand Up @@ -1870,7 +1903,8 @@ app.patch("/:id/suspended", authorizer({}), async (req, res) => {
const stream = await db.stream.get(id);
if (
!stream ||
(!req.user.admin && (stream.deleted || stream.userId !== req.user.id))
(!req.user.admin && (stream.deleted || stream.userId !== req.user.id)) ||
stream.projectId !== req.project?.id
) {
res.status(404);
return res.json({ errors: ["not found"] });
Expand All @@ -1893,7 +1927,8 @@ app.post(
const stream = await db.stream.get(id);
if (
!stream ||
(!req.user.admin && (stream.deleted || stream.userId !== req.user.id))
(!req.user.admin && (stream.deleted || stream.userId !== req.user.id)) ||
stream.projectId !== req.project?.id
) {
res.status(404);
return res.json({ errors: ["not found"] });
Expand All @@ -1916,7 +1951,8 @@ app.delete("/:id/terminate", authorizer({}), async (req, res) => {
const stream = await db.stream.get(id);
if (
!stream ||
(!req.user.admin && (stream.deleted || stream.userId !== req.user.id))
(!req.user.admin && (stream.deleted || stream.userId !== req.user.id)) ||
stream.projectId !== req.project?.id
) {
res.status(404);
return res.json({ errors: ["not found"] });
Expand Down
8 changes: 8 additions & 0 deletions packages/api/src/schema/api-schema.yaml
Expand Up @@ -531,6 +531,10 @@ components:
height: 720
items:
$ref: "#/components/schemas/ffmpeg-profile"
projectId:
type: string
description: The ID of the project
example: aac12556-4d65-4d34-9fb6-d1f0985eb0a9
record:
description: |
Should this stream be recorded? Uses default settings. For more
Expand Down Expand Up @@ -786,6 +790,10 @@ components:
type: string
example: de7818e7-610a-4057-8f6f-b785dc1e6f88
description: Points to parent stream object
projectId:
type: string
description: The ID of the project
example: aac12556-4d65-4d34-9fb6-d1f0985eb0a9
record:
description: |
Whether the stream should be recorded. Uses default settings. For more
Expand Down

0 comments on commit dbb53cc

Please sign in to comment.