Skip to content

Commit

Permalink
Use fsinfo for cockpit.file.watch()
Browse files Browse the repository at this point in the history
  • Loading branch information
jelly committed Feb 13, 2024
1 parent 8f2bed6 commit b14f6cf
Show file tree
Hide file tree
Showing 4 changed files with 434 additions and 20 deletions.
17 changes: 17 additions & 0 deletions pkg/base1/test-file.js
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ QUnit.test("watching", assert => {
const file = cockpit.file(dir + "/foobar");
let n = 0;
const watch = file.watch((content, tag) => {
console.log('got event', content, tag);
n += 1;
if (n == 1) {
assert.equal(content, null, "initially non-existent");
Expand Down Expand Up @@ -326,6 +327,22 @@ QUnit.test("watching without reading", assert => {
}, { read: false });
});

QUnit.test("watching without reading pre-created", async assert => {
const done = assert.async();
assert.expect(3);

// Pre-create foobar
const file = cockpit.file(dir + "/fsinfo");
await cockpit.spawn(["bash", "-c", `echo 1234 > ${dir}/fsinfo`]);
const watch = file.watch((content, tag) => {
assert.equal(content, null, "non-existant because read is false");
assert.notEqual(tag, null, "non empty tag");
assert.notEqual(tag, "-", "non empty tag");
watch.remove();
done();
}, { read: false });
});

QUnit.test("watching directory", assert => {
const done = assert.async();
assert.expect(20);
Expand Down
266 changes: 266 additions & 0 deletions pkg/lib/cockpit-file.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
/*
* This file is part of Cockpit.
*
* Copyright (C) 2024 Red Hat, Inc.
*
* Cockpit is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* Cockpit is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Cockpit; If not, see <http://www.gnu.org/licenses/>.
*/

export function file(path, options) {
options = options || { };
const binary = options.binary;

const self = {
path,
read,
replace,
modify,

watch,

close
};

const base_channel_options = { ...options };
delete base_channel_options.syntax;

function parse(str) {
if (options.syntax?.parse)
return options.syntax.parse(str);
else
return str;
}

function stringify(obj) {
if (options.syntax?.stringify)
return options.syntax.stringify(obj);
else
return obj;
}

let read_promise = null;
let read_channel;

function read() {
if (read_promise)
return read_promise;

const dfd = new Promise();
const opts = {
...base_channel_options,
payload: "fsread1",
path
};

function try_read() {
read_channel = cockpit.channel(opts);
const content_parts = [];
read_channel.addEventListener("message", function (event, message) {
content_parts.push(message);
});
read_channel.addEventListener("close", function (event, message) {
read_channel = null;

if (message.problem == "change-conflict") {
try_read();
return;
}

read_promise = null;

if (message.problem) {
const error = new BasicError(message.problem, message.message);
fire_watch_callbacks(null, null, error);
dfd.reject(error);
return;
}

let content;
if (message.tag == "-")
content = null;
else {
try {
content = parse(join_data(content_parts, binary));
} catch (e) {
fire_watch_callbacks(null, null, e);
dfd.reject(e);
return;
}
}

fire_watch_callbacks(content, message.tag);
dfd.resolve(content, message.tag);
});
}

try_read();

read_promise = dfd.promise;
return read_promise;
}

let replace_channel = null;

function replace(new_content, expected_tag) {
const dfd = cockpit.defer();

let file_content;
try {
file_content = (new_content === null) ? null : stringify(new_content);
} catch (e) {
dfd.reject(e);
return dfd.promise;
}

if (replace_channel)
replace_channel.close("abort");

const opts = {
...base_channel_options,
payload: "fsreplace1",
path,
tag: expected_tag
};
replace_channel = cockpit.channel(opts);

replace_channel.addEventListener("close", function (event, message) {
replace_channel = null;
if (message.problem) {
dfd.reject(new BasicError(message.problem, message.message));
} else {
fire_watch_callbacks(new_content, message.tag);
dfd.resolve(message.tag);
}
});

iterate_data(file_content, function(data) {
replace_channel.send(data);
});

replace_channel.control({ command: "done" });
return dfd.promise;
}

function modify(callback, initial_content, initial_tag) {
const dfd = cockpit.defer();

function update(content, tag) {
let new_content = callback(content);
if (new_content === undefined)
new_content = content;
replace(new_content, tag)
.done(function (new_tag) {
dfd.resolve(new_content, new_tag);
})
.fail(function (error) {
if (error.problem == "change-conflict")
read_then_update();
else
dfd.reject(error);
});
}

function read_then_update() {
read()
.done(update)
.fail(function (error) {
dfd.reject(error);
});
}

if (initial_content === undefined)
read_then_update();
else
update(initial_content, initial_tag);

return dfd.promise;
}

const watch_callbacks = [];
let n_watch_callbacks = 0;

let watch_channel = null;
let watch_tag;

function ensure_watch_channel(options) {
if (n_watch_callbacks > 0) {
if (watch_channel)
return;

const opts = {
payload: "fswatch1",
path,
superuser: base_channel_options.superuser,
};
watch_channel = cockpit.channel(opts);
watch_channel.addEventListener("message", function (event, message_string) {
let message;
try {
message = JSON.parse(message_string);
} catch (e) {
message = null;
}
if (message && message.path == path && message.tag && message.tag != watch_tag) {
if (options && options.read !== undefined && !options.read)
fire_watch_callbacks(null, message.tag);
else
read();
}
});
} else {
if (watch_channel) {
watch_channel.close();
watch_channel = null;
}
}
}

function fire_watch_callbacks(/* content, tag, error */) {
watch_tag = arguments[1] || null;
invoke_functions(watch_callbacks, self, arguments);
}

function watch(callback, options) {
if (callback)
watch_callbacks.push(callback);
n_watch_callbacks += 1;
ensure_watch_channel(options);

watch_tag = null;
read();

return {
remove: function () {
if (callback) {
const index = watch_callbacks.indexOf(callback);
if (index > -1)
watch_callbacks[index] = null;
}
n_watch_callbacks -= 1;
ensure_watch_channel(options);
}
};
}

function close() {
if (read_channel)
read_channel.close("cancelled");
if (replace_channel)
replace_channel.close("cancelled");
if (watch_channel)
watch_channel.close("cancelled");
}

return self;
}

0 comments on commit b14f6cf

Please sign in to comment.