Skip to content

Commit

Permalink
api/cannon: Re-fix local IP check with better error handling (#2124)
Browse files Browse the repository at this point in the history
* Reapply "api: Enable local IP verification on webhooks (#2118)" (#2123)

This reverts commit 5d92474.

* api/cannon: Handle not found DNSs

* api/cannon: Make DNS resolution errors non fatal

* api/cannon: Handle IP hostnames gracefully

* api/store: Avoid sql injection on webhook status update

* [DEV] Add some debug logging

* api/store: Fix updateStatus query

Can't parametrize table name

* Revert "[DEV] Add some debug logging"

This reverts commit 39e9b14.

* api: Handle localhost explicitly

* api/store/updateStatus: Remove surrounding quotes on params

* api/store: Add tests to local webhook logic

* Optimize imports
  • Loading branch information
victorges committed May 7, 2024
1 parent 9f0b8c9 commit c1e63fd
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 50 deletions.
10 changes: 5 additions & 5 deletions packages/api/src/app-router.ts
Expand Up @@ -53,6 +53,9 @@ const PROM_BUNDLE_OPTS: promBundle.Opts = {
},
};

const isTest =
process.env.NODE_ENV === "test" || process.env.NODE_ENV === "development";

export default async function appRouter(params: CliArgs) {
const {
httpPrefix,
Expand Down Expand Up @@ -125,15 +128,12 @@ export default async function appRouter(params: CliArgs) {
recordCatalystObjectStoreId,
secondaryRecordObjectStoreId,
supportAddr,
verifyUrls: true,
skipUrlVerification: isTest,
queue,
});
await webhookCannon.start();

if (
process.env.NODE_ENV === "test" ||
process.env.NODE_ENV === "development"
) {
if (isTest) {
await setupTestTus();
} else if (vodObjectStoreId) {
await setupTus(vodObjectStoreId);
Expand Down
12 changes: 7 additions & 5 deletions packages/api/src/store/webhook-table.ts
Expand Up @@ -49,12 +49,14 @@ export default class WebhookTable extends Table<DBWebhook> {
}

async updateStatus(id: string, status: DBWebhook["status"]) {
const statusStr = JSON.stringify(status);
const res = await this.db.query(
`UPDATE ${
this.name
} SET data = jsonb_set(data, '{status}', case when data->'status' is null then '{}' else data->'status' end || '${JSON.stringify(
status
)}') WHERE id = '${id}'`
sql``
.append(`UPDATE ${this.name} `) // table name can't be parameterized, append a raw string
.append(
sql`SET data = jsonb_set(data, '{status}', case when data->'status' is null then '{}' else data->'status' end || ${statusStr}) `
)
.append(sql`WHERE id = ${id}`)
);

if (res.rowCount < 1) {
Expand Down
162 changes: 162 additions & 0 deletions packages/api/src/webhooks/cannon.test.ts
Expand Up @@ -219,6 +219,47 @@ describe("webhook cannon", () => {
expect(called).toBe(true);
});

describe("local webhook", () => {
beforeAll(() => {
server.webhook.skipUrlVerification = false;
});

afterAll(() => {
server.webhook.skipUrlVerification = true;
});

it("should not call local webhooks", async () => {
// we create the same mock webhook, but given url verification is enabled it should not be called
const res = await client.post("/webhook", {
...mockWebhook,
name: "test non admin",
});
const resJson = await res.json();
console.log("webhook body: ", resJson);
expect(res.status).toBe(201);
expect(resJson.name).toBe("test non admin");

const sem = semaphore();
let called = false;
webhookCallback = () => {
called = true;
sem.release();
};

await server.queue.publishWebhook("events.stream.started", {
type: "webhook_event",
id: "webhook_test_12",
timestamp: Date.now(),
streamId: "streamid",
event: "stream.started",
userId: nonAdminUser.id,
});

await sem.wait(3000);
expect(called).toBe(false);
});
});

it("should call multiple webhooks", async () => {
let res = await client.post("/webhook", {
...mockWebhook,
Expand Down Expand Up @@ -355,4 +396,125 @@ describe("webhook cannon", () => {
expect(calledCounts).toEqual([4, 2]);
});
});

describe("local IP check", () => {
beforeAll(() => {
server.webhook.skipUrlVerification = false;
});

afterAll(() => {
server.webhook.skipUrlVerification = true;
});

const expectIsLocal = async (
url: string,
isLocal: boolean,
ips?: string[]
) => {
expect(await server.webhook.checkIsLocalIp(url, false)).toMatchObject({
isLocal,
ips,
});
};

it("should flag local IPs", async () => {
await expectIsLocal("http://127.0.0.1/test", true, ["127.0.0.1"]);
await expectIsLocal("http://[::1]/test", true, ["::1"]);
});

it("should flag private IPs", async () => {
await expectIsLocal("http://10.42.0.1/test", true, ["10.42.0.1"]);
await expectIsLocal("http://172.16.3.4/test", true, ["172.16.3.4"]);
await expectIsLocal("http://[fd12:3456:789a:1::1]/test", true, [
"fd12:3456:789a:1::1",
]);
});

it("should flag loopback addresses", async () => {
await expectIsLocal("http://localhost:1234/test", true, ["127.0.0.1"]);
await expectIsLocal("http://ip6-localhost:1234/test", true, ["::1"]);
await expectIsLocal("http://ip6-loopback:1234/test", true, ["::1"]);
});

it("should not flag public IPs", async () => {
await expectIsLocal("http://172.67.149.35/test", false, [
"172.67.149.35",
]);
await expectIsLocal("http://[2606:4700:3037::ac43:9523]/test", false, [
"2606:4700:3037::ac43:9523",
]);
});

describe("domain resolution", () => {
let prevResolver;
let resolverMock: ReturnType<typeof createResolverMock>;

const createResolverMock = () => ({
resolve4: jest.fn<Promise<string[]>, any, any>(),
resolve6: jest.fn<Promise<string[]>, any, any>(),
});

beforeEach(() => {
prevResolver = server.webhook.resolver;
server.webhook.resolver = resolverMock = createResolverMock() as any;
});

afterEach(() => {
server.webhook.resolver = prevResolver;
});

it("should not flag domains that resolve to public IPs", async () => {
resolverMock.resolve4.mockReturnValueOnce(
Promise.resolve(["172.67.149.35"])
);
resolverMock.resolve6.mockReturnValueOnce(
Promise.resolve(["2606:4700:3037::ac43:9523"])
);

await expectIsLocal("http://livepeer.studio/mock", false, [
"172.67.149.35",
"2606:4700:3037::ac43:9523",
]);
expect(resolverMock.resolve4.mock.calls).toHaveLength(1);
expect(resolverMock.resolve4.mock.calls[0][0]).toEqual(
"livepeer.studio"
);
expect(resolverMock.resolve6.mock.calls).toHaveLength(1);
expect(resolverMock.resolve6.mock.calls[0][0]).toEqual(
"livepeer.studio"
);
});

const privateTestCases = [
{ name: "IPv4-only", ipv4: ["10.42.0.10"], ipv6: [] },
{ name: "IPv6-only", ipv4: [], ipv6: ["::1"] },
{ name: "IPv4 and IPv6", ipv4: ["172.0.0.1"], ipv6: ["::1"] },
{
name: "mixed private and public IPs",
ipv4: ["172.67.149.35", "172.16.34.123"],
ipv6: ["2606:4700:3037::ac43:9523", "fd12:3456:789a:1::1"],
},
];

for (const { name, ipv4, ipv6 } of privateTestCases) {
it(`should flag domains that resolve to private IPs (${name})`, async () => {
resolverMock.resolve4.mockReturnValueOnce(Promise.resolve(ipv4));
resolverMock.resolve6.mockReturnValueOnce(Promise.resolve(ipv6));

await expectIsLocal("http://local.mydomain.com/test", true, [
...ipv4,
...ipv6,
]);
expect(resolverMock.resolve4.mock.calls).toHaveLength(1);
expect(resolverMock.resolve4.mock.calls[0][0]).toEqual(
"local.mydomain.com"
);
expect(resolverMock.resolve6.mock.calls).toHaveLength(1);
expect(resolverMock.resolve6.mock.calls[0][0]).toEqual(
"local.mydomain.com"
);
});
}
});
});
});
88 changes: 48 additions & 40 deletions packages/api/src/webhooks/cannon.ts
@@ -1,7 +1,8 @@
import { ConsumeMessage } from "amqplib";
import { promises as dns } from "dns";
import dns from "dns";
import isLocalIP from "is-local-ip";
import _ from "lodash";
import { isIP } from "net";
import { Response } from "node-fetch";
import { parse as parseUrl } from "url";
import { v4 as uuid } from "uuid";
Expand Down Expand Up @@ -42,7 +43,7 @@ function isRuntimeError(err: any): boolean {

export default class WebhookCannon {
running: boolean;
verifyUrls: boolean;
skipUrlVerification: boolean;
frontendDomain: string;
sendgridTemplateId: string;
sendgridApiKey: string;
Expand All @@ -51,7 +52,7 @@ export default class WebhookCannon {
secondaryVodObjectStoreId: string;
recordCatalystObjectStoreId: string;
secondaryRecordObjectStoreId: string;
resolver: any;
resolver: dns.promises.Resolver;
queue: Queue;
constructor({
frontendDomain,
Expand All @@ -62,11 +63,11 @@ export default class WebhookCannon {
secondaryVodObjectStoreId,
recordCatalystObjectStoreId,
secondaryRecordObjectStoreId,
verifyUrls,
skipUrlVerification,
queue,
}) {
this.running = true;
this.verifyUrls = verifyUrls;
this.skipUrlVerification = skipUrlVerification;
this.frontendDomain = frontendDomain;
this.sendgridTemplateId = sendgridTemplateId;
this.sendgridApiKey = sendgridApiKey;
Expand All @@ -75,7 +76,7 @@ export default class WebhookCannon {
this.secondaryVodObjectStoreId = secondaryVodObjectStoreId;
this.recordCatalystObjectStoreId = recordCatalystObjectStoreId;
this.secondaryRecordObjectStoreId = secondaryRecordObjectStoreId;
this.resolver = new dns.Resolver();
this.resolver = new dns.promises.Resolver();
this.queue = queue;
// this.start();
}
Expand Down Expand Up @@ -208,8 +209,7 @@ export default class WebhookCannon {
return;
}
try {
// TODO Activate URL Verification
await this._fireHook(trigger, false);
await this._fireHook(trigger);
} catch (err) {
console.log("_fireHook error", err);
await this.retry(trigger, null, err);
Expand All @@ -223,10 +223,6 @@ export default class WebhookCannon {
this.running = false;
}

disableUrlVerify() {
this.verifyUrls = false;
}

public calcBackoff = (lastInterval?: number): number => {
if (!lastInterval || lastInterval < 1000) {
return 5000;
Expand Down Expand Up @@ -328,7 +324,7 @@ export default class WebhookCannon {
);
}

async _fireHook(trigger: messages.WebhookTrigger, verifyUrl = true) {
async _fireHook(trigger: messages.WebhookTrigger) {
const { event, webhook, stream, user } = trigger;
if (!event || !webhook || !user) {
console.error(
Expand All @@ -338,34 +334,15 @@ export default class WebhookCannon {
return;
}
console.log(`trying webhook ${webhook.name}: ${webhook.url}`);
let ips, urlObj, isLocal;
if (verifyUrl) {
try {
urlObj = parseUrl(webhook.url);
if (urlObj.host) {
ips = await this.resolver.resolve4(urlObj.hostname);
}
} catch (e) {
console.error("error: ", e);
throw e;
}
}

// This is mainly useful for local testing
if (user.admin || verifyUrl === false) {
isLocal = false;
} else {
try {
if (ips && ips.length) {
isLocal = isLocalIP(ips[0]);
} else {
isLocal = true;
}
} catch (e) {
console.error("isLocal Error", isLocal, e);
throw e;
}
}
const { ips, isLocal } = await this.checkIsLocalIp(
webhook.url,
user.admin
).catch((e) => {
console.error("error checking if is local IP: ", e);
return { ips: [], isLocal: false };
});

if (isLocal) {
// don't fire this webhook.
console.log(
Expand Down Expand Up @@ -464,6 +441,37 @@ export default class WebhookCannon {
}
}

public async checkIsLocalIp(url: string, isAdmin: boolean) {
if (isAdmin || this.skipUrlVerification) {
// this is mainly useful for local testing
return { ips: [], isLocal: false };
}

const emptyIfNotFound = (err) => {
if ([dns.NODATA, dns.NOTFOUND, dns.BADFAMILY].includes(err.code)) {
return [] as string[];
}
throw err;
};

const { hostname } = parseUrl(url);
if (["localhost", "ip6-localhost", "ip6-loopback"].includes(hostname)) {
// dns.resolve functions do not take /etc/hosts into account, so we need to handle these separately
const ips = hostname === "localhost" ? ["127.0.0.1"] : ["::1"];
return { ips, isLocal: true };
}

const ips = isIP(hostname)
? [hostname]
: await Promise.all([
this.resolver.resolve4(hostname).catch(emptyIfNotFound),
this.resolver.resolve6(hostname).catch(emptyIfNotFound),
]).then((ipsArrs) => ipsArrs.flat());

const isLocal = ips.some(isLocalIP);
return { ips, isLocal };
}

async storeTriggerStatus(
webhook: DBWebhook,
triggerTime: number,
Expand Down

0 comments on commit c1e63fd

Please sign in to comment.