Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api/cannon: Re-fix local IP check with better error handling #2124

Merged
merged 13 commits into from May 7, 2024
Merged
10 changes: 5 additions & 5 deletions packages/api/src/app-router.ts
Expand Up @@ -53,6 +53,9 @@ const PROM_BUNDLE_OPTS: promBundle.Opts = {
},
};

const isTest =
process.env.NODE_ENV === "test" || process.env.NODE_ENV === "development";

export default async function appRouter(params: CliArgs) {
const {
httpPrefix,
Expand Down Expand Up @@ -125,15 +128,12 @@ export default async function appRouter(params: CliArgs) {
recordCatalystObjectStoreId,
secondaryRecordObjectStoreId,
supportAddr,
verifyUrls: true,
skipUrlVerification: isTest,
queue,
});
await webhookCannon.start();

if (
process.env.NODE_ENV === "test" ||
process.env.NODE_ENV === "development"
) {
if (isTest) {
await setupTestTus();
} else if (vodObjectStoreId) {
await setupTus(vodObjectStoreId);
Expand Down
12 changes: 7 additions & 5 deletions packages/api/src/store/webhook-table.ts
Expand Up @@ -49,12 +49,14 @@ export default class WebhookTable extends Table<DBWebhook> {
}

async updateStatus(id: string, status: DBWebhook["status"]) {
const statusStr = JSON.stringify(status);
const res = await this.db.query(
`UPDATE ${
this.name
} SET data = jsonb_set(data, '{status}', case when data->'status' is null then '{}' else data->'status' end || '${JSON.stringify(
status
)}') WHERE id = '${id}'`
sql``
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change related to the local IP change? Or is it just refactor? Both are fine, I'm just trying to connect the dots.

Copy link
Member Author

@victorges victorges Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not related, it is a fix to use proper SQL parameters in the query instead of raw string interpolation. As I was testing this there were some errors that contained ' and the query failed, so we need to use proper sql strings here. I can move this to a separate PR if you think it's necessary.

.append(`UPDATE ${this.name} `) // table name can't be parameterized, append a raw string
.append(
sql`SET data = jsonb_set(data, '{status}', case when data->'status' is null then '{}' else data->'status' end || ${statusStr}) `
)
.append(sql`WHERE id = ${id}`)
);

if (res.rowCount < 1) {
Expand Down
162 changes: 162 additions & 0 deletions packages/api/src/webhooks/cannon.test.ts
Expand Up @@ -219,6 +219,47 @@ describe("webhook cannon", () => {
expect(called).toBe(true);
});

describe("local webhook", () => {
beforeAll(() => {
server.webhook.skipUrlVerification = false;
});

afterAll(() => {
server.webhook.skipUrlVerification = true;
});

it("should not call local webhooks", async () => {
// we create the same mock webhook, but given url verification is enabled it should not be called
const res = await client.post("/webhook", {
...mockWebhook,
name: "test non admin",
});
const resJson = await res.json();
console.log("webhook body: ", resJson);
expect(res.status).toBe(201);
expect(resJson.name).toBe("test non admin");

const sem = semaphore();
let called = false;
webhookCallback = () => {
called = true;
sem.release();
};

await server.queue.publishWebhook("events.stream.started", {
type: "webhook_event",
id: "webhook_test_12",
timestamp: Date.now(),
streamId: "streamid",
event: "stream.started",
userId: nonAdminUser.id,
});

await sem.wait(3000);
expect(called).toBe(false);
});
});

it("should call multiple webhooks", async () => {
let res = await client.post("/webhook", {
...mockWebhook,
Expand Down Expand Up @@ -355,4 +396,125 @@ describe("webhook cannon", () => {
expect(calledCounts).toEqual([4, 2]);
});
});

describe("local IP check", () => {
beforeAll(() => {
server.webhook.skipUrlVerification = false;
});

afterAll(() => {
server.webhook.skipUrlVerification = true;
});

const expectIsLocal = async (
url: string,
isLocal: boolean,
ips?: string[]
) => {
expect(await server.webhook.checkIsLocalIp(url, false)).toMatchObject({
isLocal,
ips,
});
};

it("should flag local IPs", async () => {
await expectIsLocal("http://127.0.0.1/test", true, ["127.0.0.1"]);
await expectIsLocal("http://[::1]/test", true, ["::1"]);
});

it("should flag private IPs", async () => {
await expectIsLocal("http://10.42.0.1/test", true, ["10.42.0.1"]);
await expectIsLocal("http://172.16.3.4/test", true, ["172.16.3.4"]);
await expectIsLocal("http://[fd12:3456:789a:1::1]/test", true, [
"fd12:3456:789a:1::1",
]);
});

it("should flag loopback addresses", async () => {
await expectIsLocal("http://localhost:1234/test", true, ["127.0.0.1"]);
await expectIsLocal("http://ip6-localhost:1234/test", true, ["::1"]);
await expectIsLocal("http://ip6-loopback:1234/test", true, ["::1"]);
});

it("should not flag public IPs", async () => {
await expectIsLocal("http://172.67.149.35/test", false, [
"172.67.149.35",
]);
await expectIsLocal("http://[2606:4700:3037::ac43:9523]/test", false, [
"2606:4700:3037::ac43:9523",
]);
});

describe("domain resolution", () => {
let prevResolver;
let resolverMock: ReturnType<typeof createResolverMock>;

const createResolverMock = () => ({
resolve4: jest.fn<Promise<string[]>, any, any>(),
resolve6: jest.fn<Promise<string[]>, any, any>(),
});

beforeEach(() => {
prevResolver = server.webhook.resolver;
server.webhook.resolver = resolverMock = createResolverMock() as any;
});

afterEach(() => {
server.webhook.resolver = prevResolver;
});

it("should not flag domains that resolve to public IPs", async () => {
resolverMock.resolve4.mockReturnValueOnce(
Promise.resolve(["172.67.149.35"])
);
resolverMock.resolve6.mockReturnValueOnce(
Promise.resolve(["2606:4700:3037::ac43:9523"])
);

await expectIsLocal("http://livepeer.studio/mock", false, [
"172.67.149.35",
"2606:4700:3037::ac43:9523",
]);
expect(resolverMock.resolve4.mock.calls).toHaveLength(1);
expect(resolverMock.resolve4.mock.calls[0][0]).toEqual(
"livepeer.studio"
);
expect(resolverMock.resolve6.mock.calls).toHaveLength(1);
expect(resolverMock.resolve6.mock.calls[0][0]).toEqual(
"livepeer.studio"
);
});

const privateTestCases = [
{ name: "IPv4-only", ipv4: ["10.42.0.10"], ipv6: [] },
{ name: "IPv6-only", ipv4: [], ipv6: ["::1"] },
{ name: "IPv4 and IPv6", ipv4: ["172.0.0.1"], ipv6: ["::1"] },
{
name: "mixed private and public IPs",
ipv4: ["172.67.149.35", "172.16.34.123"],
ipv6: ["2606:4700:3037::ac43:9523", "fd12:3456:789a:1::1"],
},
];

for (const { name, ipv4, ipv6 } of privateTestCases) {
it(`should flag domains that resolve to private IPs (${name})`, async () => {
resolverMock.resolve4.mockReturnValueOnce(Promise.resolve(ipv4));
resolverMock.resolve6.mockReturnValueOnce(Promise.resolve(ipv6));

await expectIsLocal("http://local.mydomain.com/test", true, [
...ipv4,
...ipv6,
]);
expect(resolverMock.resolve4.mock.calls).toHaveLength(1);
expect(resolverMock.resolve4.mock.calls[0][0]).toEqual(
"local.mydomain.com"
);
expect(resolverMock.resolve6.mock.calls).toHaveLength(1);
expect(resolverMock.resolve6.mock.calls[0][0]).toEqual(
"local.mydomain.com"
);
});
}
});
});
});
88 changes: 48 additions & 40 deletions packages/api/src/webhooks/cannon.ts
@@ -1,7 +1,8 @@
import { ConsumeMessage } from "amqplib";
import { promises as dns } from "dns";
import dns from "dns";
import isLocalIP from "is-local-ip";
import _ from "lodash";
import { isIP } from "net";
import { Response } from "node-fetch";
import { parse as parseUrl } from "url";
import { v4 as uuid } from "uuid";
Expand Down Expand Up @@ -42,7 +43,7 @@ function isRuntimeError(err: any): boolean {

export default class WebhookCannon {
running: boolean;
verifyUrls: boolean;
skipUrlVerification: boolean;
frontendDomain: string;
sendgridTemplateId: string;
sendgridApiKey: string;
Expand All @@ -51,7 +52,7 @@ export default class WebhookCannon {
secondaryVodObjectStoreId: string;
recordCatalystObjectStoreId: string;
secondaryRecordObjectStoreId: string;
resolver: any;
resolver: dns.promises.Resolver;
queue: Queue;
constructor({
frontendDomain,
Expand All @@ -62,11 +63,11 @@ export default class WebhookCannon {
secondaryVodObjectStoreId,
recordCatalystObjectStoreId,
secondaryRecordObjectStoreId,
verifyUrls,
skipUrlVerification,
queue,
}) {
this.running = true;
this.verifyUrls = verifyUrls;
this.skipUrlVerification = skipUrlVerification;
this.frontendDomain = frontendDomain;
this.sendgridTemplateId = sendgridTemplateId;
this.sendgridApiKey = sendgridApiKey;
Expand All @@ -75,7 +76,7 @@ export default class WebhookCannon {
this.secondaryVodObjectStoreId = secondaryVodObjectStoreId;
this.recordCatalystObjectStoreId = recordCatalystObjectStoreId;
this.secondaryRecordObjectStoreId = secondaryRecordObjectStoreId;
this.resolver = new dns.Resolver();
this.resolver = new dns.promises.Resolver();
this.queue = queue;
// this.start();
}
Expand Down Expand Up @@ -208,8 +209,7 @@ export default class WebhookCannon {
return;
}
try {
// TODO Activate URL Verification
await this._fireHook(trigger, false);
await this._fireHook(trigger);
} catch (err) {
console.log("_fireHook error", err);
await this.retry(trigger, null, err);
Expand All @@ -223,10 +223,6 @@ export default class WebhookCannon {
this.running = false;
}

disableUrlVerify() {
this.verifyUrls = false;
}

public calcBackoff = (lastInterval?: number): number => {
if (!lastInterval || lastInterval < 1000) {
return 5000;
Expand Down Expand Up @@ -328,7 +324,7 @@ export default class WebhookCannon {
);
}

async _fireHook(trigger: messages.WebhookTrigger, verifyUrl = true) {
async _fireHook(trigger: messages.WebhookTrigger) {
const { event, webhook, stream, user } = trigger;
if (!event || !webhook || !user) {
console.error(
Expand All @@ -338,34 +334,15 @@ export default class WebhookCannon {
return;
}
console.log(`trying webhook ${webhook.name}: ${webhook.url}`);
let ips, urlObj, isLocal;
if (verifyUrl) {
try {
urlObj = parseUrl(webhook.url);
if (urlObj.host) {
ips = await this.resolver.resolve4(urlObj.hostname);
}
} catch (e) {
console.error("error: ", e);
throw e;
}
}

// This is mainly useful for local testing
if (user.admin || verifyUrl === false) {
isLocal = false;
} else {
try {
if (ips && ips.length) {
isLocal = isLocalIP(ips[0]);
} else {
isLocal = true;
}
} catch (e) {
console.error("isLocal Error", isLocal, e);
throw e;
}
}
const { ips, isLocal } = await this.checkIsLocalIp(
webhook.url,
user.admin
).catch((e) => {
console.error("error checking if is local IP: ", e);
return { ips: [], isLocal: false };
});

if (isLocal) {
// don't fire this webhook.
console.log(
Expand Down Expand Up @@ -464,6 +441,37 @@ export default class WebhookCannon {
}
}

public async checkIsLocalIp(url: string, isAdmin: boolean) {
if (isAdmin || this.skipUrlVerification) {
// this is mainly useful for local testing
return { ips: [], isLocal: false };
}

const emptyIfNotFound = (err) => {
if ([dns.NODATA, dns.NOTFOUND, dns.BADFAMILY].includes(err.code)) {
return [] as string[];
}
throw err;
};

const { hostname } = parseUrl(url);
if (["localhost", "ip6-localhost", "ip6-loopback"].includes(hostname)) {
// dns.resolve functions do not take /etc/hosts into account, so we need to handle these separately
const ips = hostname === "localhost" ? ["127.0.0.1"] : ["::1"];
return { ips, isLocal: true };
}

const ips = isIP(hostname)
? [hostname]
: await Promise.all([
this.resolver.resolve4(hostname).catch(emptyIfNotFound),
this.resolver.resolve6(hostname).catch(emptyIfNotFound),
]).then((ipsArrs) => ipsArrs.flat());

const isLocal = ips.some(isLocalIP);
return { ips, isLocal };
}

async storeTriggerStatus(
webhook: DBWebhook,
triggerTime: number,
Expand Down