Merge pull request #19864 from Microsoft/dev/aozgaa/eventPortTelemetry

Send events through a single stream
This commit is contained in:
Arthur Ozga
2017-11-17 20:22:57 -08:00
committed by GitHub
5 changed files with 249 additions and 223 deletions

View File

@@ -1447,7 +1447,9 @@ namespace ts.server {
}
this.seenProjects.set(projectKey, true);
if (!this.eventHandler) return;
if (!this.eventHandler) {
return;
}
const data: ProjectInfoTelemetryEventData = {
projectId: this.host.createHash(projectKey),

View File

@@ -1,4 +1,3 @@
/// <reference types="node" />
/// <reference path="shared.ts" />
/// <reference path="session.ts" />
@@ -7,7 +6,11 @@ namespace ts.server {
host: ServerHost;
cancellationToken: ServerCancellationToken;
canUseEvents: boolean;
installerEventPort: number;
/**
* If defined, specifies the socket used to send events to the client.
* Otherwise, events are sent through the host.
*/
eventPort?: number;
useSingleInferredProject: boolean;
useInferredProjectPerProjectRoot: boolean;
disableAutomaticTypingAcquisition: boolean;
@@ -22,10 +25,6 @@ namespace ts.server {
allowLocalPluginLoads: boolean;
}
const net: {
connect(options: { port: number }, onConnect?: () => void): NodeSocket
} = require("net");
const childProcess: {
fork(modulePath: string, args: string[], options?: { execArgv: string[], env?: MapLike<string> }): NodeChildProcess;
execFileSync(file: string, args: string[], options: { stdio: "ignore", env: MapLike<string> }): string | Buffer;
@@ -36,6 +35,14 @@ namespace ts.server {
tmpdir(): string;
} = require("os");
interface NodeSocket {
write(data: string, encoding: string): boolean;
}
const net: {
connect(options: { port: number }, onConnect?: () => void): NodeSocket
} = require("net");
function getGlobalTypingsCacheLocation() {
switch (process.platform) {
case "win32": {
@@ -83,10 +90,6 @@ namespace ts.server {
pid: number;
}
interface NodeSocket {
write(data: string, encoding: string): boolean;
}
interface ReadLineOptions {
input: NodeJS.ReadableStream;
output?: NodeJS.WritableStream;
@@ -243,10 +246,7 @@ namespace ts.server {
class NodeTypingsInstaller implements ITypingsInstaller {
private installer: NodeChildProcess;
private installerPidReported = false;
private socket: NodeSocket;
private projectService: ProjectService;
private eventSender: EventSender;
private activeRequestCount = 0;
private requestQueue: QueuedOperation[] = [];
private requestMap = createMap<QueuedOperation>(); // Maps operation ID to newest requestQueue entry with that ID
@@ -267,18 +267,11 @@ namespace ts.server {
private readonly telemetryEnabled: boolean,
private readonly logger: server.Logger,
private readonly host: ServerHost,
eventPort: number,
readonly globalTypingsCacheLocation: string,
readonly typingSafeListLocation: string,
readonly typesMapLocation: string,
private readonly npmLocation: string | undefined,
private newLine: string) {
if (eventPort) {
const s = net.connect({ port: eventPort }, () => {
this.socket = s;
this.reportInstallerProcessId();
});
}
private event: Event) {
}
isKnownTypesPackageName(name: string): boolean {
@@ -306,24 +299,6 @@ namespace ts.server {
});
}
private reportInstallerProcessId() {
if (this.installerPidReported) {
return;
}
if (this.socket && this.installer) {
this.sendEvent(0, "typingsInstallerPid", { pid: this.installer.pid });
this.installerPidReported = true;
}
}
private sendEvent(seq: number, event: string, body: any): void {
this.socket.write(formatMessage({ seq, type: "event", event, body }, this.logger, Buffer.byteLength, this.newLine), "utf8");
}
setTelemetrySender(telemetrySender: EventSender) {
this.eventSender = telemetrySender;
}
attach(projectService: ProjectService) {
this.projectService = projectService;
if (this.logger.hasLevel(LogLevel.requestTime)) {
@@ -363,7 +338,8 @@ namespace ts.server {
this.installer = childProcess.fork(combinePaths(__dirname, "typingsInstaller.js"), args, { execArgv });
this.installer.on("message", m => this.handleMessage(m));
this.reportInstallerProcessId();
this.event({ pid: this.installer.pid }, "typingsInstallerPid");
process.on("exit", () => {
this.installer.kill();
@@ -428,92 +404,81 @@ namespace ts.server {
break;
}
case EventInitializationFailed:
{
if (!this.eventSender) {
break;
}
const body: protocol.TypesInstallerInitializationFailedEventBody = {
message: response.message
};
const eventName: protocol.TypesInstallerInitializationFailedEventName = "typesInstallerInitializationFailed";
this.eventSender.event(body, eventName);
break;
}
case EventBeginInstallTypes:
{
if (!this.eventSender) {
break;
}
const body: protocol.BeginInstallTypesEventBody = {
eventId: response.eventId,
packages: response.packagesToInstall,
};
const eventName: protocol.BeginInstallTypesEventName = "beginInstallTypes";
this.eventSender.event(body, eventName);
break;
}
case EventEndInstallTypes:
{
if (!this.eventSender) {
break;
}
if (this.telemetryEnabled) {
const body: protocol.TypingsInstalledTelemetryEventBody = {
telemetryEventName: "typingsInstalled",
payload: {
installedPackages: response.packagesToInstall.join(","),
installSuccess: response.installSuccess,
typingsInstallerVersion: response.typingsInstallerVersion
}
{
const body: protocol.TypesInstallerInitializationFailedEventBody = {
message: response.message
};
const eventName: protocol.TelemetryEventName = "telemetry";
this.eventSender.event(body, eventName);
const eventName: protocol.TypesInstallerInitializationFailedEventName = "typesInstallerInitializationFailed";
this.event(body, eventName);
break;
}
case EventBeginInstallTypes:
{
const body: protocol.BeginInstallTypesEventBody = {
eventId: response.eventId,
packages: response.packagesToInstall,
};
const eventName: protocol.BeginInstallTypesEventName = "beginInstallTypes";
this.event(body, eventName);
break;
}
case EventEndInstallTypes:
{
if (this.telemetryEnabled) {
const body: protocol.TypingsInstalledTelemetryEventBody = {
telemetryEventName: "typingsInstalled",
payload: {
installedPackages: response.packagesToInstall.join(","),
installSuccess: response.installSuccess,
typingsInstallerVersion: response.typingsInstallerVersion
}
};
const eventName: protocol.TelemetryEventName = "telemetry";
this.event(body, eventName);
}
const body: protocol.EndInstallTypesEventBody = {
eventId: response.eventId,
packages: response.packagesToInstall,
success: response.installSuccess,
};
const eventName: protocol.EndInstallTypesEventName = "endInstallTypes";
this.eventSender.event(body, eventName);
break;
}
const body: protocol.EndInstallTypesEventBody = {
eventId: response.eventId,
packages: response.packagesToInstall,
success: response.installSuccess,
};
const eventName: protocol.EndInstallTypesEventName = "endInstallTypes";
this.event(body, eventName);
break;
}
case ActionInvalidate:
{
this.projectService.updateTypingsForProject(response);
break;
}
{
this.projectService.updateTypingsForProject(response);
break;
}
case ActionSet:
{
if (this.activeRequestCount > 0) {
this.activeRequestCount--;
}
else {
Debug.fail("Received too many responses");
}
while (this.requestQueue.length > 0) {
const queuedRequest = this.requestQueue.shift();
if (this.requestMap.get(queuedRequest.operationId) === queuedRequest) {
this.requestMap.delete(queuedRequest.operationId);
this.scheduleRequest(queuedRequest);
break;
{
if (this.activeRequestCount > 0) {
this.activeRequestCount--;
}
else {
Debug.fail("Received too many responses");
}
if (this.logger.hasLevel(LogLevel.verbose)) {
this.logger.info(`Skipping defunct request for: ${queuedRequest.operationId}`);
while (this.requestQueue.length > 0) {
const queuedRequest = this.requestQueue.shift();
if (this.requestMap.get(queuedRequest.operationId) === queuedRequest) {
this.requestMap.delete(queuedRequest.operationId);
this.scheduleRequest(queuedRequest);
break;
}
if (this.logger.hasLevel(LogLevel.verbose)) {
this.logger.info(`Skipping defunct request for: ${queuedRequest.operationId}`);
}
}
this.projectService.updateTypingsForProject(response);
this.event(response, "setTypings");
break;
}
this.projectService.updateTypingsForProject(response);
if (this.socket) {
this.sendEvent(0, "setTypings", response);
}
break;
}
default:
assertTypeIsNever(response);
}
@@ -529,11 +494,30 @@ namespace ts.server {
}
class IOSession extends Session {
private eventPort: number;
private eventSocket: NodeSocket | undefined;
private socketEventQueue: { body: any, eventName: string }[] | undefined;
private constructed: boolean | undefined;
constructor(options: IoSessionOptions) {
const { host, installerEventPort, globalTypingsCacheLocation, typingSafeListLocation, typesMapLocation, npmLocation, canUseEvents } = options;
const { host, eventPort, globalTypingsCacheLocation, typingSafeListLocation, typesMapLocation, npmLocation, canUseEvents } = options;
const event: Event | undefined = (body: {}, eventName: string) => {
if (this.constructed) {
this.event(body, eventName);
}
else {
// It is unsafe to dereference `this` before initialization completes,
// so we defer until the next tick.
//
// Construction should finish before the next tick fires, so we do not need to do this recursively.
setImmediate(() => this.event(body, eventName));
}
};
const typingsInstaller = disableAutomaticTypingAcquisition
? undefined
: new NodeTypingsInstaller(telemetryEnabled, logger, host, installerEventPort, globalTypingsCacheLocation, typingSafeListLocation, typesMapLocation, npmLocation, host.newLine);
: new NodeTypingsInstaller(telemetryEnabled, logger, host, globalTypingsCacheLocation, typingSafeListLocation, typesMapLocation, npmLocation, event);
super({
host,
@@ -547,11 +531,49 @@ namespace ts.server {
canUseEvents,
globalPlugins: options.globalPlugins,
pluginProbeLocations: options.pluginProbeLocations,
allowLocalPluginLoads: options.allowLocalPluginLoads });
allowLocalPluginLoads: options.allowLocalPluginLoads
});
if (telemetryEnabled && typingsInstaller) {
typingsInstaller.setTelemetrySender(this);
this.eventPort = eventPort;
if (this.canUseEvents && this.eventPort) {
const s = net.connect({ port: this.eventPort }, () => {
this.eventSocket = s;
if (this.socketEventQueue) {
// flush queue.
for (const event of this.socketEventQueue) {
this.writeToEventSocket(event.body, event.eventName);
}
this.socketEventQueue = undefined;
}
});
}
this.constructed = true;
}
event<T>(body: T, eventName: string): void {
Debug.assert(this.constructed, "Should only call `IOSession.prototype.event` on an initialized IOSession");
if (this.canUseEvents && this.eventPort) {
if (!this.eventSocket) {
if (this.logger.hasLevel(LogLevel.verbose)) {
this.logger.info(`eventPort: event "${eventName}" queued, but socket not yet initialized`);
}
(this.socketEventQueue || (this.socketEventQueue = [])).push({ body, eventName });
return;
}
else {
Debug.assert(this.socketEventQueue === undefined);
this.writeToEventSocket(body, eventName);
}
}
else {
super.event(body, eventName);
}
}
private writeToEventSocket(body: any, eventName: string): void {
this.eventSocket.write(formatMessage(toEvent(body, eventName), this.logger, this.byteLength, this.host.newLine), "utf8");
}
exit() {
@@ -896,7 +918,7 @@ namespace ts.server {
cancellationToken = nullCancellationToken;
}
let eventPort: number;
let eventPort: number | undefined;
{
const str = findArgument("--eventPort");
const v = str && parseInt(str);
@@ -936,8 +958,8 @@ namespace ts.server {
const options: IoSessionOptions = {
host: sys,
cancellationToken,
installerEventPort: eventPort,
canUseEvents: eventPort === undefined,
eventPort,
canUseEvents: true,
useSingleInferredProject,
useInferredProjectPerProjectRoot,
disableAutomaticTypingAcquisition,

View File

@@ -105,10 +105,6 @@ namespace ts.server {
project: Project;
}
export interface EventSender {
event<T>(payload: T, eventName: string): void;
}
function allEditsBeforePos(edits: ts.TextChange[], pos: number) {
for (const edit of edits) {
if (textSpanEnd(edit.span) >= pos) {
@@ -243,6 +239,22 @@ namespace ts.server {
}
}
export type Event = <T>(body: T, eventName: string) => void;
export interface EventSender {
event: Event;
}
/** @internal */
export function toEvent(eventName: string, body: {}): protocol.Event {
return {
seq: 0,
type: "event",
event: eventName,
body
};
}
export interface SessionOptions {
host: ServerHost;
cancellationToken: ServerCancellationToken;
@@ -252,6 +264,9 @@ namespace ts.server {
byteLength: (buf: string, encoding?: string) => number;
hrtime: (start?: number[]) => number[];
logger: Logger;
/**
* If falsy, all events are suppressed.
*/
canUseEvents: boolean;
eventHandler?: ProjectServiceEventHandler;
throttleWaitMilliseconds?: number;
@@ -269,15 +284,15 @@ namespace ts.server {
private currentRequestId: number;
private errorCheck: MultistepOperation;
private eventHandler: ProjectServiceEventHandler;
private host: ServerHost;
protected host: ServerHost;
private readonly cancellationToken: ServerCancellationToken;
protected readonly typingsInstaller: ITypingsInstaller;
private byteLength: (buf: string, encoding?: string) => number;
protected byteLength: (buf: string, encoding?: string) => number;
private hrtime: (start?: number[]) => number[];
protected logger: Logger;
private canUseEvents: boolean;
protected canUseEvents: boolean;
private eventHandler: ProjectServiceEventHandler;
constructor(opts: SessionOptions) {
this.host = opts.host;
@@ -293,7 +308,6 @@ namespace ts.server {
this.eventHandler = this.canUseEvents
? opts.eventHandler || (event => this.defaultEventHandler(event))
: undefined;
const multistepOperationHost: MultistepOperationHost = {
executeWithRequestId: (requestId, action) => this.executeWithRequestId(requestId, action),
getCurrentRequestId: () => this.currentRequestId,
@@ -321,13 +335,7 @@ namespace ts.server {
}
private sendRequestCompletedEvent(requestId: number): void {
const event: protocol.RequestCompletedEvent = {
seq: 0,
type: "event",
event: "requestCompleted",
body: { request_seq: requestId }
};
this.send(event);
this.event<protocol.RequestCompletedEventBody>({ request_seq: requestId }, "requestCompleted");
}
private defaultEventHandler(event: ProjectServiceEvent) {
@@ -401,17 +409,12 @@ namespace ts.server {
this.host.write(formatMessage(msg, this.logger, this.byteLength, this.host.newLine));
}
public event<T>(info: T, eventName: string) {
const ev: protocol.Event = {
seq: 0,
type: "event",
event: eventName,
body: info
};
this.send(ev);
public event<T>(body: T, eventName: string): void {
this.send(toEvent(eventName, body));
}
// For backwards-compatibility only.
/** @deprecated */
public output(info: any, cmdName: string, reqSeq?: number, errorMsg?: string): void {
this.doOutput(info, cmdName, reqSeq, /*success*/ !errorMsg, errorMsg);
}