mirror of
https://github.com/microsoft/vscode.git
synced 2026-06-13 02:29:02 -05:00
agentHost: batch AHP log writes and skip URI deep clone (#318864)
* agentHost: batch AHP log writes and skip URI deep clone Two perf fixes for AhpJsonlLogger, the per-message JSONL transport logger. CPU traces of the agents window under heavy AHP traffic showed ~23% of wall time in MajorGC, dominated by VSBuffer allocations from renderer-side writeFile IPCs and the recursive _replaceUris deep clone applied to every message before JSON.stringify. - log() now appends to a pending buffer list and schedules a single drain on the write queue. While a writeFile is in flight, all subsequent log() calls accumulate and land in the next drain via VSBuffer.concat, capped at 1 MiB per write. Rotation is still checked per entry, and flush()/ordering semantics are preserved via a _drainScheduled invariant. - stringifyAhpLogEntry() now uses a JSON.stringify replacer instead of a tree-walking deep clone. URI.toJSON() stamps the output with $mid: MarshalledId.Uri, which the replacer detects (guarded by isUriComponents) and rewrites to the canonical URI string. This removes per-message allocation of a clone of the entire message payload. Adds tests for batching, flush ordering across drains, and URI replacer edge cases (nested URIs, raw UriComponents from prior toJSON, URI-shaped objects without $mid, and non-URI objects that happen to carry $mid: 1). (Written by Copilot) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * agentHost: tighten batching test to assert exactly one write Addresses review feedback: the previous `writeCount < messageCount` assertion would pass even with a near-pathological 49-writes-for-50-logs regression. All 50 log() calls are queued synchronously and must land in the very first drain, so assert writeCount === 1. (Written by Copilot) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -5,8 +5,9 @@
|
||||
|
||||
import { VSBuffer } from '../../../base/common/buffer.js';
|
||||
import { Disposable } from '../../../base/common/lifecycle.js';
|
||||
import { MarshalledId } from '../../../base/common/marshallingIds.js';
|
||||
import { joinPath } from '../../../base/common/resources.js';
|
||||
import { URI } from '../../../base/common/uri.js';
|
||||
import { isUriComponents, URI, UriComponents } from '../../../base/common/uri.js';
|
||||
import { IFileService, IFileStatWithMetadata } from '../../files/common/files.js';
|
||||
import { ILogService } from '../../log/common/log.js';
|
||||
|
||||
@@ -23,6 +24,11 @@ export interface IAhpJsonlLoggerOptions {
|
||||
const AHP_LOG_DIR = 'ahp';
|
||||
const DEFAULT_MAX_FILE_SIZE_BYTES = 75 * 1024 * 1024;
|
||||
const DEFAULT_MAX_FILES = 5;
|
||||
// Cap the size of any single coalesced writeFile to avoid producing huge
|
||||
// concatenated VSBuffers (which would just create the GC pressure we're
|
||||
// trying to avoid). 1 MiB strikes a balance between amortizing IPC overhead
|
||||
// and keeping per-write allocations modest.
|
||||
const MAX_BATCH_BYTES = 1024 * 1024;
|
||||
|
||||
export class AhpJsonlLogger extends Disposable {
|
||||
|
||||
@@ -34,6 +40,8 @@ export class AhpJsonlLogger extends Disposable {
|
||||
private _currentSize = 0;
|
||||
private _segment = 0;
|
||||
private _queue = Promise.resolve();
|
||||
private _pending: VSBuffer[] = [];
|
||||
private _drainScheduled = false;
|
||||
private _folderCreated: Promise<IFileStatWithMetadata> | undefined;
|
||||
|
||||
constructor(
|
||||
@@ -67,17 +75,37 @@ export class AhpJsonlLogger extends Disposable {
|
||||
}
|
||||
};
|
||||
const line = `${stringifyAhpLogEntry(entry)}\n`;
|
||||
const buffer = VSBuffer.fromString(line);
|
||||
this._queue = this._queue.then(() => this._appendLine(buffer)).catch(error => {
|
||||
this._pending.push(VSBuffer.fromString(line));
|
||||
this._scheduleDrain();
|
||||
}
|
||||
|
||||
async flush(): Promise<void> {
|
||||
// Pending entries always have a drain scheduled (see _scheduleDrain), so
|
||||
// awaiting the queue is sufficient to flush everything submitted before
|
||||
// this call.
|
||||
await this._queue;
|
||||
}
|
||||
|
||||
private _scheduleDrain(): void {
|
||||
if (this._drainScheduled) {
|
||||
return;
|
||||
}
|
||||
this._drainScheduled = true;
|
||||
this._queue = this._queue.then(() => this._drainPending()).catch(error => {
|
||||
this._logService.error('[AHPLog] Failed to write transport log', error);
|
||||
});
|
||||
}
|
||||
|
||||
async flush(): Promise<void> {
|
||||
await this._queue;
|
||||
}
|
||||
private async _drainPending(): Promise<void> {
|
||||
// Clear the scheduled flag before snapshotting _pending so that any log()
|
||||
// calls happening during the awaits below will schedule a fresh drain.
|
||||
this._drainScheduled = false;
|
||||
if (this._pending.length === 0) {
|
||||
return;
|
||||
}
|
||||
const buffers = this._pending;
|
||||
this._pending = [];
|
||||
|
||||
private async _appendLine(buffer: VSBuffer): Promise<void> {
|
||||
// Create folder once and memoize to avoid repeated filesystem calls
|
||||
if (!this._folderCreated) {
|
||||
this._folderCreated = this._fileService.createFolder(this._directory);
|
||||
@@ -86,11 +114,37 @@ export class AhpJsonlLogger extends Disposable {
|
||||
if (this._currentSize === 0) {
|
||||
this._currentSize = await this._getFileSize(this._currentFile);
|
||||
}
|
||||
if (this._currentSize > 0 && this._currentSize + buffer.byteLength > this._maxFileSizeBytes) {
|
||||
await this._rotate();
|
||||
|
||||
// Coalesce buffers into chunks, respecting both file-rotation size and the
|
||||
// per-write batch cap. Rotation is checked per-entry to preserve the
|
||||
// invariant that we don't exceed maxFileSizeBytes once a file has data.
|
||||
let chunk: VSBuffer[] = [];
|
||||
let chunkSize = 0;
|
||||
const flushChunk = async () => {
|
||||
if (chunk.length === 0) {
|
||||
return;
|
||||
}
|
||||
const combined = chunk.length === 1 ? chunk[0] : VSBuffer.concat(chunk, chunkSize);
|
||||
await this._fileService.writeFile(this._currentFile, combined, { append: true });
|
||||
this._currentSize += combined.byteLength;
|
||||
chunk = [];
|
||||
chunkSize = 0;
|
||||
};
|
||||
|
||||
for (const buffer of buffers) {
|
||||
const totalInFile = this._currentSize + chunkSize;
|
||||
if (totalInFile > 0 && totalInFile + buffer.byteLength > this._maxFileSizeBytes) {
|
||||
await flushChunk();
|
||||
await this._rotate();
|
||||
} else if (chunkSize > 0 && chunkSize + buffer.byteLength > MAX_BATCH_BYTES) {
|
||||
// Same file but the batch is getting too large; flush early to
|
||||
// avoid creating an oversized concatenated VSBuffer.
|
||||
await flushChunk();
|
||||
}
|
||||
chunk.push(buffer);
|
||||
chunkSize += buffer.byteLength;
|
||||
}
|
||||
await this._fileService.writeFile(this._currentFile, buffer, { append: true });
|
||||
this._currentSize += buffer.byteLength;
|
||||
await flushChunk();
|
||||
}
|
||||
|
||||
private async _rotate(): Promise<void> {
|
||||
@@ -127,29 +181,25 @@ export function getAhpLogByteLength(text: string): number {
|
||||
}
|
||||
|
||||
export function stringifyAhpLogEntry(value: unknown): string {
|
||||
return JSON.stringify(_replaceUris(value));
|
||||
return JSON.stringify(value, _ahpReplacer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively replaces {@link URI} instances with their string form before
|
||||
* handing the value to JSON.stringify. A replacer function is NOT sufficient
|
||||
* because JSON.stringify calls toJSON() on an object *before* invoking the
|
||||
* replacer, so the replacer would receive a plain UriComponents object rather
|
||||
* than the original URI instance, and URI.isUri() would return false for it.
|
||||
* JSON.stringify replacer that converts URI values to their canonical string
|
||||
* form. `URI.prototype.toJSON()` runs before this replacer is invoked and
|
||||
* produces a {@link UriComponents}-shaped object stamped with
|
||||
* `$mid: MarshalledId.Uri`, which we detect here to round-trip back through
|
||||
* {@link URI.revive}. This avoids the expensive deep-clone tree walk that
|
||||
* would otherwise be required to find every URI in a message payload.
|
||||
*/
|
||||
function _replaceUris(value: unknown): unknown {
|
||||
if (URI.isUri(value)) {
|
||||
return value.toString();
|
||||
}
|
||||
if (Array.isArray(value)) {
|
||||
return value.map(_replaceUris);
|
||||
}
|
||||
if (value !== null && typeof value === 'object') {
|
||||
const result: Record<string, unknown> = {};
|
||||
for (const key of Object.keys(value)) {
|
||||
result[key] = _replaceUris((value as Record<string, unknown>)[key]);
|
||||
}
|
||||
return result;
|
||||
function _ahpReplacer(this: unknown, _key: string, value: unknown): unknown {
|
||||
if (
|
||||
value
|
||||
&& typeof value === 'object'
|
||||
&& (value as { $mid?: number }).$mid === MarshalledId.Uri
|
||||
&& isUriComponents(value)
|
||||
) {
|
||||
return URI.revive(value as UriComponents).toString();
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
@@ -8,9 +8,10 @@ import { basename, dirname, joinPath } from '../../../../base/common/resources.j
|
||||
import { URI } from '../../../../base/common/uri.js';
|
||||
import { ensureNoDisposablesAreLeakedInTestSuite } from '../../../../base/test/common/utils.js';
|
||||
import { FileService } from '../../../files/common/fileService.js';
|
||||
import { IFileWriteOptions } from '../../../files/common/files.js';
|
||||
import { InMemoryFileSystemProvider } from '../../../files/common/inMemoryFilesystemProvider.js';
|
||||
import { NullLogService } from '../../../log/common/log.js';
|
||||
import { AhpJsonlLogger, getAhpLogByteLength } from '../../common/ahpJsonlLogger.js';
|
||||
import { AhpJsonlLogger, getAhpLogByteLength, stringifyAhpLogEntry } from '../../common/ahpJsonlLogger.js';
|
||||
|
||||
suite('AhpJsonlLogger', () => {
|
||||
|
||||
@@ -147,4 +148,121 @@ suite('AhpJsonlLogger', () => {
|
||||
rootsAreJsonRpc: true,
|
||||
});
|
||||
});
|
||||
|
||||
test('coalesces synchronously queued log calls into a single write', async () => {
|
||||
const fileService = store.add(new FileService(new NullLogService()));
|
||||
const provider = store.add(new RecordingInMemoryFileSystemProvider());
|
||||
store.add(fileService.registerProvider('file', provider));
|
||||
|
||||
const logger = store.add(new AhpJsonlLogger(
|
||||
{ logsHome: URI.file('/logs'), connectionId: 'batched', transport: 'websocket' },
|
||||
fileService,
|
||||
new NullLogService(),
|
||||
));
|
||||
|
||||
const messageCount = 50;
|
||||
for (let i = 0; i < messageCount; i++) {
|
||||
logger.log({ jsonrpc: '2.0', id: i, result: { ok: true } }, 's2c');
|
||||
}
|
||||
await logger.flush();
|
||||
|
||||
const content = (await fileService.readFile(logger.resource)).value.toString();
|
||||
const lines = content.split('\n').filter(Boolean);
|
||||
const ids = lines.map(line => JSON.parse(line).id);
|
||||
|
||||
// All 50 log() calls are queued synchronously, so they all land in the
|
||||
// first drain and must be coalesced into exactly one writeFile.
|
||||
assert.deepStrictEqual({
|
||||
lineCount: lines.length,
|
||||
idsInOrder: ids,
|
||||
writeCount: provider.writeCount,
|
||||
}, {
|
||||
lineCount: messageCount,
|
||||
idsInOrder: Array.from({ length: messageCount }, (_, i) => i),
|
||||
writeCount: 1,
|
||||
});
|
||||
});
|
||||
|
||||
test('flush waits for batched writes and ordering is preserved across drains', async () => {
|
||||
const fileService = store.add(new FileService(new NullLogService()));
|
||||
store.add(fileService.registerProvider('file', store.add(new InMemoryFileSystemProvider())));
|
||||
|
||||
const logger = store.add(new AhpJsonlLogger(
|
||||
{ logsHome: URI.file('/logs'), connectionId: 'flush-order', transport: 'websocket' },
|
||||
fileService,
|
||||
new NullLogService(),
|
||||
));
|
||||
|
||||
// Submit a batch, partially flush, then submit another batch interleaved
|
||||
// with the flush — ordering must be preserved.
|
||||
logger.log({ jsonrpc: '2.0', id: 1, result: 'a' }, 's2c');
|
||||
logger.log({ jsonrpc: '2.0', id: 2, result: 'b' }, 's2c');
|
||||
const firstFlush = logger.flush();
|
||||
logger.log({ jsonrpc: '2.0', id: 3, result: 'c' }, 's2c');
|
||||
await firstFlush;
|
||||
logger.log({ jsonrpc: '2.0', id: 4, result: 'd' }, 's2c');
|
||||
await logger.flush();
|
||||
|
||||
const content = (await fileService.readFile(logger.resource)).value.toString();
|
||||
const ids = content.split('\n').filter(Boolean).map(line => JSON.parse(line).id);
|
||||
assert.deepStrictEqual(ids, [1, 2, 3, 4]);
|
||||
});
|
||||
|
||||
suite('stringifyAhpLogEntry', () => {
|
||||
|
||||
test('serialises a top-level URI as its string form', () => {
|
||||
const uri = URI.parse('file:///tmp/example.txt');
|
||||
const result = JSON.parse(stringifyAhpLogEntry({ uri }));
|
||||
assert.strictEqual(result.uri, uri.toString());
|
||||
});
|
||||
|
||||
test('serialises URIs nested in arrays and objects', () => {
|
||||
const a = URI.parse('file:///a');
|
||||
const b = URI.parse('https://example.com/b?x=1');
|
||||
const c = URI.parse('untitled:Untitled-1');
|
||||
const payload = {
|
||||
items: [a, { nested: b }, [c]],
|
||||
};
|
||||
const result = JSON.parse(stringifyAhpLogEntry(payload));
|
||||
assert.deepStrictEqual(result, {
|
||||
items: [a.toString(), { nested: b.toString() }, [c.toString()]],
|
||||
});
|
||||
});
|
||||
|
||||
test('round-trips raw UriComponents marked with $mid', () => {
|
||||
const uri = URI.parse('vscode://example/path');
|
||||
const components = uri.toJSON();
|
||||
// Simulate a value that came back over IPC and was never revived
|
||||
const result = JSON.parse(stringifyAhpLogEntry({ uri: components }));
|
||||
assert.strictEqual(result.uri, uri.toString());
|
||||
});
|
||||
|
||||
test('leaves URI-shaped objects without $mid as plain objects', () => {
|
||||
// A user payload that happens to have URI-like fields but is not a
|
||||
// URI must not be silently rewritten.
|
||||
const payload = {
|
||||
scheme: 'not-a-uri',
|
||||
path: '/something',
|
||||
};
|
||||
const result = JSON.parse(stringifyAhpLogEntry(payload));
|
||||
assert.deepStrictEqual(result, payload);
|
||||
});
|
||||
|
||||
test('does not misidentify non-URI objects that carry $mid: 1', () => {
|
||||
// $mid is only safely a URI marker when the object also has the
|
||||
// UriComponents shape (scheme: string). Non-conforming payloads
|
||||
// must pass through unchanged.
|
||||
const payload = { $mid: 1, label: 'not a uri' };
|
||||
const result = JSON.parse(stringifyAhpLogEntry(payload));
|
||||
assert.deepStrictEqual(result, payload);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
class RecordingInMemoryFileSystemProvider extends InMemoryFileSystemProvider {
|
||||
writeCount = 0;
|
||||
override async writeFile(resource: URI, content: Uint8Array, opts: IFileWriteOptions): Promise<void> {
|
||||
this.writeCount++;
|
||||
return super.writeFile(resource, content, opts);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user