Skip to content

Commit

Permalink
[ package:vm_service ] Automatically invoke VmService.dispose() when …
Browse files Browse the repository at this point in the history
…the service connection closes

Fixes #55559

Change-Id: I213ae3960c15bf2a68b4113a26f333090266b9c9
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/365060
Reviewed-by: Derek Xu <derekx@google.com>
Commit-Queue: Ben Konyi <bkonyi@google.com>
  • Loading branch information
bkonyi authored and Commit Queue committed May 2, 2024
1 parent 73f5417 commit a7d8707
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 21 deletions.
4 changes: 4 additions & 0 deletions pkg/vm_service/CHANGELOG.md
@@ -1,3 +1,7 @@
## 14.2.2
- Fixes issue where outstanding service requests were not automatically completed
with an error when the VM service connection was closed.

## 14.2.1
- Fixes heap snapshot decoding error (dart-lang/sdk#55475).

Expand Down
23 changes: 13 additions & 10 deletions pkg/vm_service/lib/src/vm_service.dart
Expand Up @@ -300,6 +300,8 @@ class VmService {
Future<void> get onDone => _onDoneCompleter.future;
final _onDoneCompleter = Completer<void>();

bool _disposed = false;

final _eventControllers = <String, StreamController<Event>>{};

StreamController<Event> _getEventController(String eventName) {
Expand All @@ -321,16 +323,14 @@ class VmService {
Future? streamClosed,
this.wsUri,
}) {
_streamSub = inStream.listen(_processMessage,
onDone: () => _onDoneCompleter.complete());
_streamSub = inStream.listen(
_processMessage,
onDone: () async => await dispose(),
);
_writeMessage = writeMessage;
_log = log ?? _NullLog();
_disposeHandler = disposeHandler;
streamClosed?.then((_) {
if (!_onDoneCompleter.isCompleted) {
_onDoneCompleter.complete();
}
});
streamClosed?.then((_) async => await dispose());
}

static VmService defaultFactory({
Expand Down Expand Up @@ -1735,6 +1735,10 @@ class VmService {
}

Future<void> dispose() async {
if (_disposed) {
return;
}
_disposed = true;
await _streamSub.cancel();
_outstandingRequests.forEach((id, request) {
request._completer.completeError(RPCError(
Expand All @@ -1748,9 +1752,8 @@ class VmService {
if (handler != null) {
await handler();
}
if (!_onDoneCompleter.isCompleted) {
_onDoneCompleter.complete();
}
assert(!_onDoneCompleter.isCompleted);
_onDoneCompleter.complete();
}

/// When overridden, this method wraps [future] with logic.
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm_service/pubspec.yaml
@@ -1,5 +1,5 @@
name: vm_service
version: 14.2.1
version: 14.2.2
description: >-
A library to communicate with a service implementing the Dart VM
service protocol.
Expand Down
46 changes: 46 additions & 0 deletions pkg/vm_service/test/common/utils.dart
@@ -0,0 +1,46 @@
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:convert';
import 'dart:io';

// TODO(bkonyi): Share this logic with _ServiceTesteeRunner.launch.
Future<(Process, Uri)> spawnDartProcess(
String script, {
bool serveObservatory = true,
bool pauseOnStart = true,
bool disableServiceAuthCodes = false,
bool subscribeToStdio = true,
}) async {
final executable = Platform.executable;
final tmpDir = await Directory.systemTemp.createTemp('dart_service');
final serviceInfoUri = tmpDir.uri.resolve('service_info.json');
final serviceInfoFile = await File.fromUri(serviceInfoUri).create();

final arguments = [
'--disable-dart-dev',
'--observe=0',
if (!serveObservatory) '--no-serve-observatory',
if (pauseOnStart) '--pause-isolates-on-start',
if (disableServiceAuthCodes) '--disable-service-auth-codes',
'--write-service-info=$serviceInfoUri',
...Platform.executableArguments,
Platform.script.resolve(script).toString(),
];
final process = await Process.start(executable, arguments);
if (subscribeToStdio) {
process.stdout
.transform(utf8.decoder)
.listen((line) => print('TESTEE OUT: $line'));
process.stderr
.transform(utf8.decoder)
.listen((line) => print('TESTEE ERR: $line'));
}
while ((await serviceInfoFile.length()) <= 5) {
await Future.delayed(const Duration(milliseconds: 50));
}
final content = await serviceInfoFile.readAsString();
final infoJson = json.decode(content);
return (process, Uri.parse(infoJson['uri']));
}
10 changes: 10 additions & 0 deletions pkg/vm_service/test/regress_55559_script.dart
@@ -0,0 +1,10 @@
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:io';

void main() {
// Block the thread so the isolate can't response to service requests.
sleep(const Duration(hours: 1));
}
77 changes: 77 additions & 0 deletions pkg/vm_service/test/regress_55559_test.dart
@@ -0,0 +1,77 @@
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

// Regression test for https://github.com/dart-lang/sdk/issues/55559.
//
// Ensures that the `VmService` instance calls `dispose()` automatically if the
// VM service connection goes down. Without the `dispose()` call, outstanding
// requests won't complete unless the developer registered a callback for
// `VmService.onDone` that calls `dispose()`.

import 'dart:async';
import 'dart:io';

import 'package:test/test.dart';
import 'package:vm_service/vm_service.dart';
import 'package:vm_service/vm_service_io.dart';

import 'common/utils.dart';

void main() {
(Process, Uri)? state;

void killProcess() {
if (state != null) {
final (process, _) = state!;
process.kill();
state = null;
}
}

setUp(() async {
state = await spawnDartProcess(
'regress_55559_script.dart',
pauseOnStart: false,
);
});

tearDown(() {
killProcess();
});

test(
'Regress 55559: VmService closes outstanding requests on service disconnect',
() async {
final (_, uri) = state!;
final wsUri = uri.replace(
scheme: 'ws',
pathSegments: [
// The path will have a trailing '/', so the last path segment is the
// empty string and should be removed.
...[...uri.pathSegments]..removeLast(),
'ws',
],
);
final service = await vmServiceConnectUri(wsUri.toString());
final vm = await service.getVM();
final isolate = vm.isolates!.first;
final errorCompleter = Completer<RPCError>();
unawaited(
service.getIsolate(isolate.id!).then(
(_) => fail('Future should throw'),
onError: (e) => errorCompleter.complete(e),
),
);
killProcess();

// Wait for the process to exit and the service connection to close.
await service.onDone;

// The outstanding getIsolate request should be completed with an error.
final error = await errorCompleter.future;
expect(error.code, RPCErrorKind.kServerError.code);
expect(error.message, 'Service connection disposed');
},
);
}
23 changes: 13 additions & 10 deletions pkg/vm_service/tool/dart/generate_dart_client.dart
Expand Up @@ -65,6 +65,10 @@ export 'snapshot_graph.dart' show HeapSnapshotClass,
}
Future<void> dispose() async {
if (_disposed) {
return;
}
_disposed = true;
await _streamSub.cancel();
_outstandingRequests.forEach((id, request) {
request._completer.completeError(RPCError(
Expand All @@ -78,9 +82,8 @@ export 'snapshot_graph.dart' show HeapSnapshotClass,
if (handler != null) {
await handler();
}
if (!_onDoneCompleter.isCompleted) {
_onDoneCompleter.complete();
}
assert(!_onDoneCompleter.isCompleted);
_onDoneCompleter.complete();
}
/// When overridden, this method wraps [future] with logic.
Expand Down Expand Up @@ -581,6 +584,8 @@ typedef VmServiceFactory<T extends VmService> = T Function({
Future<void> get onDone => _onDoneCompleter.future;
final _onDoneCompleter = Completer<void>();
bool _disposed = false;
final _eventControllers = <String, StreamController<Event>>{};
StreamController<Event> _getEventController(String eventName) {
Expand All @@ -602,16 +607,14 @@ typedef VmServiceFactory<T extends VmService> = T Function({
Future? streamClosed,
this.wsUri,
}) {
_streamSub = inStream.listen(_processMessage,
onDone: () => _onDoneCompleter.complete());
_streamSub = inStream.listen(
_processMessage,
onDone: () async => await dispose(),
);
_writeMessage = writeMessage;
_log = log ?? _NullLog();
_disposeHandler = disposeHandler;
streamClosed?.then((_) {
if (!_onDoneCompleter.isCompleted) {
_onDoneCompleter.complete();
}
});
streamClosed?.then((_) async => await dispose());
}
static VmService defaultFactory({
Expand Down

0 comments on commit a7d8707

Please sign in to comment.