From f563ff252e4ca38bf4469bb130e29d8fa464635f Mon Sep 17 00:00:00 2001 From: Ron Buckton Date: Fri, 6 Oct 2023 17:42:51 -0400 Subject: [PATCH] cleanup unused coordination primitives --- src/compiler/_namespaces/ts.ts | 3 +- src/compiler/core.ts | 45 +++++-- src/compiler/debug.ts | 5 + src/compiler/program.ts | 51 +------- .../sharing/collections/concurrentMap.ts | 55 ++++---- src/compiler/sharing/sharedParserState.ts | 8 +- src/compiler/sharing/structs/wrapper.ts | 65 ---------- .../{sharing => threading}/atomicValue.ts | 64 ++++++++-- src/compiler/threading/countdownEvent.ts | 118 ------------------ src/compiler/threading/manualResetEvent.ts | 45 ------- src/compiler/threading/semaphore.ts | 93 -------------- src/compiler/threading/spinWait.ts | 53 +++++--- .../unittests/sharing/concurrentMap.ts | 66 +++++----- 13 files changed, 213 insertions(+), 458 deletions(-) delete mode 100644 src/compiler/sharing/structs/wrapper.ts rename src/compiler/{sharing => threading}/atomicValue.ts (55%) delete mode 100644 src/compiler/threading/countdownEvent.ts delete mode 100644 src/compiler/threading/manualResetEvent.ts delete mode 100644 src/compiler/threading/semaphore.ts diff --git a/src/compiler/_namespaces/ts.ts b/src/compiler/_namespaces/ts.ts index a5a61d16be5..6cede5dd369 100644 --- a/src/compiler/_namespaces/ts.ts +++ b/src/compiler/_namespaces/ts.ts @@ -16,6 +16,7 @@ export * from "../sharing/structs/shareable"; export * from "../sharing/structs/sharedStruct"; export * from "../sharing/structs/taggedStruct"; export * from "../sharing/structs/wrapper"; +export * from "../sharing/collections/deque"; export * from "../sharing/collections/hashData"; export * from "../sharing/collections/sharedLinkedList"; export * from "../sharing/collections/concurrentMap"; @@ -31,9 +32,7 @@ export * from "../sharing/sharedObjectAllocator"; export * from "../sharing/sharedParserState"; export * from "../sharing/sharedSymbol"; export * from "../threading/condition"; -export * from "../threading/countdownEvent"; export * from "../threading/lockable"; -export * from "../threading/manualResetEvent"; export * from "../threading/mutex"; export * from "../threading/scopedLock"; export * from "../threading/sharedLockable"; diff --git a/src/compiler/core.ts b/src/compiler/core.ts index b0c038ca842..4b4b42c9a38 100644 --- a/src/compiler/core.ts +++ b/src/compiler/core.ts @@ -132,8 +132,9 @@ export function zipWith(sequenceA: Sequence, sequenceB: Sequence, const result: V[] = []; const arrayA = getArrayLike(sequenceA); const arrayB = getArrayLike(sequenceB); - Debug.assertEqual(arrayA.length, arrayB.length); - for (let i = 0; i < arrayA.length; i++) { + const length = arrayA.length; + Debug.assertEqual(length, arrayB.length); + for (let i = 0; i < length; i++) { result.push(callback(arrayA[i], arrayB[i], i)); } return result; @@ -289,7 +290,7 @@ export function contains(sequence: Sequence | undefined, value: T, equalit export function arraysEqual(sequenceA: Sequence, sequenceB: Sequence, equalityComparer: EqualityComparer = equateValues): boolean { const arrayA = getArrayLike(sequenceA); const arrayB = getArrayLike(sequenceB); - return arrayA.length === arrayB.length && Array.prototype.every.call(arrayA, (x: T, i: number) => equalityComparer(x, arrayB[i])); + return arrayA.length === arrayB.length && Array.prototype.every.call(arrayA, (x, i) => equalityComparer(x, arrayB[i])); } /** @internal */ @@ -340,13 +341,22 @@ export function filter(array: readonly T[] | undefined, f: (x: T /** @internal */ export function filter(array: readonly T[] | undefined, f: (x: T) => boolean): readonly T[] | undefined; /** @internal */ -export function filter(array: readonly T[] | undefined, f: (x: T) => boolean): readonly T[] | undefined { +export function filter(array: SharedArray, f: (x: T) => x is U): SharedArray; +/** @internal */ +export function filter(array: SharedArray, f: (x: T) => boolean): SharedArray; +/** @internal */ +export function filter(array: SharedNodeArray, f: (x: T) => x is U): SharedArray | SharedNodeArray; +/** @internal */ +export function filter(array: SharedNodeArray, f: (x: T) => boolean): SharedArray | SharedNodeArray; +/** @internal */ +export function filter(sequence: Sequence | undefined, f: (x: T) => boolean) { + const array = getArrayLike(sequence); if (array) { const len = array.length; let i = 0; while (i < len && f(array[i])) i++; if (i < len) { - const result = array.slice(0, i); + const result = Array.prototype.slice.call(array, 0, i); i++; while (i < len) { const item = array[i]; @@ -355,10 +365,10 @@ export function filter(array: readonly T[] | undefined, f: (x: T) => boolean) } i++; } - return result; + return isShareableNonPrimitive(sequence) ? sharedArrayFrom(result) : result; } } - return array; + return sequence; } /** @internal */ @@ -454,7 +464,6 @@ export function flatten(array: T[][] | readonly (T | readonly T[] | undefined return result; } - /** * Maps an array. If the mapped value is an array, it is spread into the result. * @@ -1479,6 +1488,26 @@ export function arrayFrom(iterator: Iterable, map?: (t: T) => U): (T | return result; } +/** + * Shims `Array.from`. + * + * @internal + */ +export function sharedArrayFrom(iterator: Iterable, map: (t: T) => U): SharedArray; +/** @internal */ +export function sharedArrayFrom(iterator: Iterable): SharedArray; +/** @internal */ +export function sharedArrayFrom(iterator: Iterable | readonly T[] | SharedArray, map?: (t: T) => T): SharedArray { + const array: ArrayLike = isArray(iterator) || isSharedArray(iterator) ? iterator : arrayFrom(iterator); + const length = array.length; + const sharedArray = new SharedArray(length); + for (let i = 0; i < length; i++) { + const value = array[i]; + sharedArray[i] = map ? map(value) : value; + } + return sharedArray; +} + /** @internal */ export function assign(t: T, ...args: (T | undefined)[]) { for (const arg of args) { diff --git a/src/compiler/debug.ts b/src/compiler/debug.ts index abe3f11303b..39ed02cdc4a 100644 --- a/src/compiler/debug.ts +++ b/src/compiler/debug.ts @@ -7,6 +7,7 @@ import { compareValues, EmitFlags, every, + FileIncludeKind, FlowFlags, FlowLabel, FlowNode, @@ -519,6 +520,10 @@ export namespace Debug { return formatEnum(facts, (ts as any).TypeFacts, /*isFlags*/ true); } + export function formatFileIncludeKind(kind: FileIncludeKind): string { + return formatEnum(kind, (ts as any).FileIncludeKind, /*isflags*/ false); + } + let isDebugInfoEnabled = false; let flowNodeProto: FlowNodeBase | undefined; diff --git a/src/compiler/program.ts b/src/compiler/program.ts index d9064e48b81..971f0cab2dc 100644 --- a/src/compiler/program.ts +++ b/src/compiler/program.ts @@ -292,6 +292,8 @@ import { setParentRecursive, setResolvedModule, setResolvedTypeReferenceDirective, + Shared, + SharedStructBase, shouldResolveJsRequire, skipTrivia, skipTypeChecking, @@ -420,12 +422,11 @@ export function makeCompilerHostParallel( Debug.assert(!host.threadPool && !host.requestSourceFile, "Compiler host may already be parallelized."); const sharedEntryMap = new Map(); - const parserState = new SharedParserState(); + const parserState = new SharedParserState(threadPool.poolSize); const savedGetSourceFile = host.getSourceFile; host.threadPool = threadPool; host.requestSourceFile = (fileName, languageVersionOrOptions, shouldCreateNewSourceFile) => { using _measure = performance.measureActivity("Parallel: Request Source File", "beforeParserPausedRequest", "afterParserPausedRequest"); - if (!ConcurrentMap.has(parserState.files, fileName)) { const entry = new SharedSourceFileEntry( parserState, @@ -440,42 +441,11 @@ export function makeCompilerHostParallel( threadPool.queueWorkItem("Program.requestSourceFile", entry); } } - - // // quick check before allocating an entry - // { - // using _ = new SharedLock(parserState.sharedMutex); - // if (SharedMap.has(parserState.files, fileName)) { - // return; - // } - // } - - // const entry = new SharedSourceFileEntry( - // parserState, - // !!host.createHash, - // !!setParentNodes, - // fileName, - // typeof languageVersionOrOptions === "object" ? languageVersionOrOptions.languageVersion : languageVersionOrOptions, - // typeof languageVersionOrOptions === "object" ? languageVersionOrOptions.impliedNodeFormat : undefined, - // shouldCreateNewSourceFile); - - // using _ = new UniqueLock(parserState.sharedMutex); - // if (!SharedMap.has(parserState.files, fileName)) { - // SharedMap.set(parserState.files, fileName, entry); - // // we inserted the entry, queue a task to parse it - // threadPool.queueWorkItem("Program.requestSourceFile", entry); - // } }; host.getSourceFile = (fileName, languageVersionOrOptions, onError, shouldCreateNewSourceFile) => { using _ = performance.measureActivity("Parallel: Get Source File", "beforeParserPausedRead", "afterParserPausedRead"); const sharedFileEntry = ConcurrentMap.get(parserState.files, fileName); - - // let sharedFileEntry: SharedSourceFileEntry | undefined; - // { - // using _ = new SharedLock(parserState.sharedMutex); - // sharedFileEntry = SharedMap.get(parserState.files, fileName); - // } - if (sharedFileEntry) { let file = sharedEntryMap.get(sharedFileEntry); if (file !== undefined) { @@ -3706,7 +3676,7 @@ export function createProgram(rootNamesOrOptions: readonly string[] | CreateProg using _ = tracing?.traceActivity(tracing.Phase.Program, "findSourceFile", { fileName, isDefaultLib: isDefaultLib || undefined, - fileIncludeKind: (FileIncludeKind as any)[reason.kind], + fileIncludeKind: Debug.formatFileIncludeKind(reason.kind), }); const path = toPath(fileName); @@ -4039,18 +4009,7 @@ export function createProgram(rootNamesOrOptions: readonly string[] | CreateProg reason: FileIncludeReason, currentNodeModulesDepth: number ) { - tracing?.push(tracing.Phase.Program, "processTypeReferenceDirective", { directive: typeReferenceDirective, hasResolved: !!resolution.resolvedTypeReferenceDirective, refKind: reason.kind, refPath: isReferencedFile(reason) ? reason.file : undefined }); - yield* processTypeReferenceDirectiveWorker(typeReferenceDirective, mode, resolution, reason, currentNodeModulesDepth); - tracing?.pop(); - } - - function* processTypeReferenceDirectiveWorker( - typeReferenceDirective: string, - mode: ResolutionMode, - resolution: ResolvedTypeReferenceDirectiveWithFailedLookupLocations, - reason: FileIncludeReason, - currentNodeModulesDepth: number - ) { + using _ = tracing?.traceActivity(tracing.Phase.Program, "processTypeReferenceDirective", { directive: typeReferenceDirective, hasResolved: !!resolution.resolvedTypeReferenceDirective, refKind: reason.kind, refPath: isReferencedFile(reason) ? reason.file : undefined }); let hasRecordedResolutionDiagnostics = false; function* recordResolutionDiagnosticsIfNeeded(needsYield = true) { if (!hasRecordedResolutionDiagnostics) { diff --git a/src/compiler/sharing/collections/concurrentMap.ts b/src/compiler/sharing/collections/concurrentMap.ts index 704f759ca57..0b699153d83 100644 --- a/src/compiler/sharing/collections/concurrentMap.ts +++ b/src/compiler/sharing/collections/concurrentMap.ts @@ -1,13 +1,12 @@ import { iterateValues } from "../../core"; import { Debug } from "../../debug"; import { sys } from "../../sys"; +import { AtomicValue } from "../../threading/atomicValue"; import { Mutex } from "../../threading/mutex"; import { ScopedLock } from "../../threading/scopedLock"; import { UniqueLock } from "../../threading/uniqueLock"; -import { AtomicValue } from "../atomicValue"; import { Shared, SharedStructBase } from "../structs/sharedStruct"; import { Tag, Tagged } from "../structs/taggedStruct"; -import { StructWrapperTypes } from "../structs/wrapper"; import { generateHashSeed, identityHash } from "./hash"; import { getPrime } from "./hashData"; @@ -62,35 +61,20 @@ type WILDCARD = typeof WILDCARD; type EXIST = typeof EXIST; type NOT_EXIST = typeof NOT_EXIST; -/** @internal */ +/** + * A concurrent Map-like object. Based on https://github.com/dotnet/runtime/blob/main/src/libraries/System.Collections.Concurrent/src/System/Collections/Concurrent/ConcurrentDictionary.cs + * @internal + */ @Shared() export class ConcurrentMap, V extends NonNullable> extends Tagged(SharedStructBase, Tag.ConcurrentMap) { - declare [StructWrapperTypes]: [ - [typeof ConcurrentMap, { - size(): number; - has(key: K): boolean; - get(key: K): V | undefined; - set(key: K, value: V): void; - delete(key: K, expectedValue?: V): V | undefined; - insert(key: K, value: V): boolean; - replace(key: K, expectedValue: V, replacementValue: V): V | undefined; - exchange(key: K, value: V | undefined): V | undefined; - compareExchange(key: K, expectedValue: V | undefined, replacementValue: V | undefined): V | undefined; - clear(): void; - keys(): IterableIterator; - values(): IterableIterator; - entries(): IterableIterator<[K, V]>; - }] - ]; - @Shared() private readonly _tables: AtomicValue>; @Shared() private _budget: number; @Shared() private readonly _growLockArray: boolean; constructor(); constructor(items: ConcurrentMap | Iterable<[K, V]>); + constructor(concurrencyLevel?: number, capacity?: number); constructor(concurrencyLevel: number, items: ConcurrentMap | Iterable<[K, V]>); - constructor(concurrencyLevel: number, capacity: number); constructor(concurrencyLevelOrItems?: number | ConcurrentMap | Iterable<[K, V]>, capacityOrItems?: number | ConcurrentMap | Iterable<[K, V]>) { let concurrencyLevel: number | undefined; let capacity: number | undefined; @@ -514,3 +498,30 @@ function isLockFree(value: Shareable) { if (typeof value === "number") return isFinite(value) && (value === (value >>> 0) || value === (value >> 0)); return true; } + +/** + * A non-shared wrapper for {@link ConcurrentMap} objects. + * @internal + */ +export class ConcurrentMapWrapper, V extends NonNullable> { + private self: ConcurrentMap; + + constructor(map: ConcurrentMap) { + this.self = map; + } + + get size(): number { return ConcurrentMap.size(this.self); } + has(key: K): boolean { return ConcurrentMap.has(this.self, key); } + get(key: K): V | undefined { return ConcurrentMap.get(this.self, key); } + set(key: K, value: V): this { return ConcurrentMap.set(this.self, key, value), this; } + delete(key: K, expectedValue?: V): V | undefined { return ConcurrentMap.delete(this.self, key, expectedValue); } + insert(key: K, value: V): boolean { return ConcurrentMap.insert(this.self, key, value); } + replace(key: K, expectedValue: V, replacementValue: V): boolean { return ConcurrentMap.replace(this.self, key, expectedValue, replacementValue); } + exchange(key: K, value: V | undefined): V | undefined { return ConcurrentMap.exchange(this.self, key, value); } + compareExchange(key: K, expectedValue: V | undefined, replacementValue: V | undefined): V | undefined { return ConcurrentMap.compareExchange(this.self, key, expectedValue, replacementValue); } + clear(): void { return ConcurrentMap.clear(this.self); } + keys(): IterableIterator { return ConcurrentMap.keys(this.self); } + values(): IterableIterator { return ConcurrentMap.values(this.self); } + entries(): IterableIterator<[K, V]> { return ConcurrentMap.entries(this.self); } + [Symbol.iterator]() { return this.entries(); } +} diff --git a/src/compiler/sharing/sharedParserState.ts b/src/compiler/sharing/sharedParserState.ts index 9261c5ee1eb..4dc8b135195 100644 --- a/src/compiler/sharing/sharedParserState.ts +++ b/src/compiler/sharing/sharedParserState.ts @@ -8,9 +8,11 @@ import { Shared, SharedStructBase } from "./structs/sharedStruct"; /** @internal */ @Shared() export class SharedParserState extends SharedStructBase { - // @Shared() sharedMutex = new SharedMutex(); - // @Shared() files = new SharedMap(); - @Shared() files = new ConcurrentMap(); + @Shared() files: ConcurrentMap; + constructor(concurrencyLevel?: number) { + super(); + this.files = new ConcurrentMap(concurrencyLevel); + } } /** @internal */ diff --git a/src/compiler/sharing/structs/wrapper.ts b/src/compiler/sharing/structs/wrapper.ts deleted file mode 100644 index f561a44d0ea..00000000000 --- a/src/compiler/sharing/structs/wrapper.ts +++ /dev/null @@ -1,65 +0,0 @@ -import { Debug } from "../../debug"; - -/** - * Used in a `declare` instance field of a struct type to override the type we use for a wrapper object generated for - * the struct. We do this because we cannot properly infer correct generic instantiations from generic static methods. - * This declaration intentionally does not produce an actual value as it should only be used in an ambient context. - * @internal - */ -export declare const StructWrapperTypes: unique symbol; - -/** @internal */ -export type StructWrapper = - TInstance extends { [StructWrapperTypes]: infer T extends [any, any][] } ? - T[number] extends [TStatics, infer U] ? U : - SynthesizedStructWrapper : - SynthesizedStructWrapper; - -type SynthesizedStructInstanceFields = { - [P in keyof TInstance & string as Exclude]: TInstance[P]; -}; - -type SynthesizedStructInstanceMethods = { - [P in keyof TStatics & string as TStatics[P] extends (self: TInstance, ...args: any) => any ? Exclude : never]: - TStatics[P] extends (self: TInstance, ...args: infer A) => infer R ? (...args: A) => R : never; -}; - -type SynthesizedStructWrapper = - & SynthesizedStructInstanceFields - & SynthesizedStructInstanceMethods - ; - -/** - * Generates a simple, shallow proxy for a shared struct that treats static methods as instance methods with the first - * parameter bound to the instance. - * @internal - */ -export function wrapStruct(instance: TInstance, statics: TStatics): StructWrapper { - const wrapper = {} as any; - for (const key of Reflect.ownKeys(instance) as Iterable) { - if (typeof key !== "string") continue; - if (key === "__tag__" || key == "__hash__") continue; - Object.defineProperty(wrapper, key, { - enumerable: true, - configurable: false, - get: () => instance[key], - set: (value: TInstance[typeof key]) => instance[key] = value - }); - } - - for (const key of Reflect.ownKeys(statics) as Iterable) { - if (typeof key !== "string") continue; - if (key.startsWith("_")) continue; - const value = statics[key]; - if (typeof value !== "function") continue; - if (Object.prototype.hasOwnProperty.call(value, key)) Debug.fail(`static method name '${key}' conflicts with same named instance field`); - Object.defineProperty(wrapper, key, { - enumerable: false, - configurable: false, - writable: false, - value: value.bind(/*thisArg*/ undefined, instance) - }); - } - - return wrapper as StructWrapper; -} diff --git a/src/compiler/sharing/atomicValue.ts b/src/compiler/threading/atomicValue.ts similarity index 55% rename from src/compiler/sharing/atomicValue.ts rename to src/compiler/threading/atomicValue.ts index 91e82de9d03..7d5d312c5cc 100644 --- a/src/compiler/sharing/atomicValue.ts +++ b/src/compiler/threading/atomicValue.ts @@ -1,8 +1,17 @@ -import { Shared, SharedStructBase } from "./structs/sharedStruct"; +import { spin } from "./spinWait"; +import { Shared, SharedStructBase } from "../sharing/structs/sharedStruct"; -/** @internal */ +/** + * Atomically read and write a shared value. While this isn't strictly necessary as `Atomics` methods can be used + * directly, it provides a convenient mechanism for updating fields marked with the TypeScript `private` keyword. + * @internal + */ @Shared() export class AtomicValue extends SharedStructBase { + /** + * The held value. It is not recommended to read or write from this field directly as such operations are neither + * atomic nor sequentially consistent. Instead, you should use {@link AtomicValue.load} and {@link AtomicValue.store}. + */ @Shared() unsafeValue: T; constructor(value: T) { @@ -10,18 +19,36 @@ export class AtomicValue extends SharedStructBase { this.unsafeValue = value; } + /** + * Atomically loads and returns the underlying value. This operation is guaranteed to be sequentially consistent + * across multiple threads/agents. + */ static load(self: AtomicValue) { return Atomics.load(self, "unsafeValue"); } + /** + * Atomically replaces the underlying value with the provided value. This operation is guaranteed to be sequentially + * consistent across multiple threads/agents. + */ static store(self: AtomicValue, value: T) { - return Atomics.store(self, "unsafeValue", value); + Atomics.store(self, "unsafeValue", value); } + /** + * Atomically replaces the underlying value with the provided value, returning the previous value as a + * read-modify-write operation. This operation is guaranteed to be sequentially consistent across multiple + * threads/agents. + */ static exchange(self: AtomicValue, value: T) { return Atomics.exchange(self, "unsafeValue", value); } + /** + * Atomically replaces the underlying value with the provided replacement value if the underling value and expected + * value are the same, returning the previous value as a read-modify-write operation. This operation is guaranteed + * to be sequentially consistent across multiple threads/agents. + */ static compareExchange(self: AtomicValue, expectedValue: T, replacementValue: T) { return Atomics.compareExchange(self, "unsafeValue", expectedValue, replacementValue); } @@ -30,33 +57,56 @@ export class AtomicValue extends SharedStructBase { return expectedValue === AtomicValue.compareExchange(self, expectedValue, replacementValue); } + /** + * Performs 32-bit signed integer addition via an atomic read-modify-write operation. + * @returns the value prior to addition. + */ static add(self: AtomicValue, value: number) { + let spinCounter = 0; let currentValue = AtomicValue.load(self); - while (!AtomicValue.compareAndSet(self, currentValue, (currentValue + value) >> 0)) { + while (currentValue !== AtomicValue.compareExchange(self, currentValue, (currentValue + value) >> 0)) { + spinCounter = spin(spinCounter); currentValue = AtomicValue.load(self); } return currentValue; } + /** + * Performs 32-bit signed integer subtraction via an atomic read-modify-write operation. + * @returns the value prior to subtraction. + */ static sub(self: AtomicValue, value: number) { + let spinCounter = 0; let currentValue = AtomicValue.load(self); - while (!AtomicValue.compareAndSet(self, currentValue, (currentValue - value) >> 0)) { + while (currentValue !== AtomicValue.compareExchange(self, currentValue, (currentValue - value) >> 0)) { + spinCounter = spin(spinCounter); currentValue = AtomicValue.load(self); } return currentValue; } + /** + * Atomically increments a 32-bit signed integer value. + * @returns the value prior to subtraction. + */ static increment(self: AtomicValue) { return AtomicValue.add(self, 1); } + /** + * Atomically decrements a 32-bit signed integer value. + * @returns the value prior to subtraction. + */ static decrement(self: AtomicValue) { return AtomicValue.sub(self, 1); } } -/** @internal */ -export class AtomicRef { +/** + * A non-shared wrapper for an {@link AtomicValue}. + * @internal + */ +export class AtomicRef implements Disposable { private _value: AtomicValue | undefined; constructor(value: AtomicValue | undefined) { diff --git a/src/compiler/threading/countdownEvent.ts b/src/compiler/threading/countdownEvent.ts deleted file mode 100644 index 58fd2162f5e..00000000000 --- a/src/compiler/threading/countdownEvent.ts +++ /dev/null @@ -1,118 +0,0 @@ -import { Condition, Debug, isTaggedStruct, Mutex, Shared, SharedStructBase, Tag, Tagged, UniqueLock } from "../_namespaces/ts"; - -const MAX_INT32 = 2 ** 31 - 1; - -/** @internal */ -@Shared() -export class CountdownEvent extends Tagged(SharedStructBase, Tag.CountdownEvent) { - @Shared() private _mutex: Mutex | undefined = new Mutex(); - @Shared() private _condition: Condition | undefined = new Condition(); - @Shared() private _signaled = false; - @Shared() private _initialCount: number; - @Shared() private _remainingCount: number = 0; - - constructor(initialCount = 0) { - super(); - initialCount |= 0; - Debug.assert(initialCount >= 0, "initialCount out of range"); - this._initialCount = initialCount; - this._remainingCount = initialCount; - this._signaled = initialCount === 0; - } - - static initialCount(self: CountdownEvent) { - return self._initialCount; - } - - static remainingCount(self: CountdownEvent) { - return self._remainingCount; - } - - static isSet(self: CountdownEvent) { - return self._remainingCount === 0; - } - - static add(self: CountdownEvent, count?: number) { - Debug.assert(CountdownEvent.tryAdd(self, count), "event is already signaled"); - } - - static tryAdd(self: CountdownEvent, count = 1) { - count |= 0; - Debug.assert(count >= 1, "count out of range"); - Debug.assert(self._mutex, "object disposed"); - using _ = new UniqueLock(self._mutex); - if (self._remainingCount === 0) return false; - Debug.assert(self._remainingCount <= MAX_INT32 - count, "count out of range"); - self._remainingCount += count; - return true; - } - - static reset(self: CountdownEvent, count?: number) { - if (count !== undefined) { - count |= 0; - Debug.assert(count >= 0, "count out of range"); - } - - Debug.assert(self._mutex && self._condition, "object disposed"); - using _ = new UniqueLock(self._mutex); - count ??= self._initialCount; - self._remainingCount = count; - self._initialCount = count; - if (count === 0) { - if (!self._signaled) { - self._signaled = true; - Condition.notify(self._condition); - } - } - else { - self._signaled = false; - } - } - - static signal(self: CountdownEvent, count = 1) { - count |= 0; - Debug.assert(count >= 1, "count out of range"); - - Debug.assert(self._mutex && self._condition, "object disposed"); - using _ = new UniqueLock(self._mutex); - Debug.assert(self._remainingCount >= count, "count out of range"); - self._remainingCount = self._remainingCount - count; - if (self._remainingCount === 0) { - if (!self._signaled) { - self._signaled = true; - Condition.notify(self._condition); - } - return true; - } - return false; - } - - static wait(self: CountdownEvent, timeout?: number) { - if (timeout !== undefined) { - timeout = Math.max(0, timeout | 0); - } - - Debug.assert(self._mutex && self._condition, "object disposed"); - if (CountdownEvent.isSet(self)) { - return true; - } - - using lck = new UniqueLock(self._mutex); - if (timeout !== undefined) { - return Condition.waitFor(self._condition, lck, timeout) !== "timed-out"; - } - else { - Condition.wait(self._condition, lck); - return true; - } - } - - static close(self: CountdownEvent): void { - self._mutex = undefined; - self._condition = undefined; - } - - static [Symbol.hasInstance](value: unknown): value is CountdownEvent { - return isTaggedStruct(value, Tag.CountdownEvent); - } -} diff --git a/src/compiler/threading/manualResetEvent.ts b/src/compiler/threading/manualResetEvent.ts deleted file mode 100644 index fdd5f7fcd46..00000000000 --- a/src/compiler/threading/manualResetEvent.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { Shared, SharedStructBase } from "../sharing/structs/sharedStruct"; -import { isTaggedStruct, Tag, Tagged } from "../sharing/structs/taggedStruct"; - -/** @internal */ -@Shared() -export class ManualResetEvent extends Tagged(SharedStructBase, Tag.ManualResetEvent) { - @Shared() private mutex = new Atomics.Mutex(); - @Shared() private condition = new Atomics.Condition(); - @Shared() private signaled = false; - - static isSet(self: ManualResetEvent) { - return self.signaled; - } - - static set(self: ManualResetEvent) { - let result = false; - Atomics.Mutex.tryLock(self.mutex, () => { - if (!self.signaled) { - self.signaled = true; - Atomics.Condition.notify(self.condition); - result = true; - } - }); - return result; - } - - static reset(self: ManualResetEvent) { - let result = false; - Atomics.Mutex.tryLock(self.mutex, () => { - if (self.signaled) { - self.signaled = false; - result = true; - } - }); - return result; - } - - static wait(self: ManualResetEvent, timeout?: number) { - return Atomics.Mutex.lock(self.mutex, () => Atomics.Condition.wait(self.condition, self.mutex, timeout)); - } - - static [Symbol.hasInstance](value: unknown): value is ManualResetEvent { - return isTaggedStruct(value, Tag.ManualResetEvent); - } -} \ No newline at end of file diff --git a/src/compiler/threading/semaphore.ts b/src/compiler/threading/semaphore.ts deleted file mode 100644 index ffa9116a73c..00000000000 --- a/src/compiler/threading/semaphore.ts +++ /dev/null @@ -1,93 +0,0 @@ -import { Condition, Debug, Mutex, Shared, SharedStructBase, Tag, Tagged, UniqueLock } from "../_namespaces/ts"; -import { AtomicValue } from "../sharing/atomicValue"; - -/** @internal */ -@Shared() -export class Semaphore extends Tagged(SharedStructBase, Tag.Semaphore) { - @Shared() private readonly _maxCount: number; - @Shared() private readonly _currentCount: AtomicValue; - @Shared() private readonly _waitCount = new AtomicValue(0); - @Shared() private readonly _mutex = new Mutex(); - @Shared() private readonly _condition = new Condition(); - - constructor(initialCount: number, maxCount: number = 2 ** 31 - 1) { - Debug.assert(initialCount === (initialCount | 0) && initialCount >= 0); - Debug.assert(maxCount >= 0 && maxCount >= initialCount); - super(); - this._maxCount = maxCount; - this._currentCount = new AtomicValue(initialCount); - } - - static count(self: Semaphore) { - return AtomicValue.load(self._currentCount); - } - - static tryAcquire(self: Semaphore, timeout?: number) { - if (timeout !== undefined) { - if (isNaN(timeout) || isFinite(timeout)) timeout |= 0; - if (timeout < 0) timeout = 0; - } - - // first, try to acquire a count lock-free - let count = AtomicValue.load(self._currentCount); - if (count > 0 && count === (count = AtomicValue.compareExchange(self._currentCount, count, count - 1))) { - return true; - } - if (timeout === undefined) { - return false; - } - - const start = Date.now(); - let remaining: number; - - // if that fails, take a lock and wait for a spot to open up - using lck = new UniqueLock(self._mutex); - using _waiter = waitScope(self._waitCount); - do { - remaining = timeout - (Date.now() - start); - if (count <= 0) { - if (Condition.waitFor(self._condition, lck, remaining) === "timed-out") { - return false; - } - count = AtomicValue.load(self._currentCount); - } - else if (count === (count = AtomicValue.compareExchange(self._currentCount, count, count - 1))) { - return true; - } - } - while (remaining >= 0); - return false; - } - - static acquire(self: Semaphore) { - Semaphore.tryAcquire(self, Infinity); - } - - static release(self: Semaphore, count = 1) { - let previousCount = AtomicValue.load(self._currentCount); - let nextCount: number; - let waitCount: number; - do { - Debug.assert(self._maxCount - previousCount >= count); - nextCount = previousCount + count; - waitCount = AtomicValue.load(self._waitCount); - } - while (previousCount !== (previousCount = AtomicValue.compareExchange(self._currentCount, previousCount, nextCount))); - if (nextCount === 1 && waitCount === 1) { - Condition.notify(self._condition, 1); - } - else if (waitCount > 1) { - Condition.notify(self._condition); - } - return previousCount; - } -} - -function waitScope(count: AtomicValue) { - AtomicValue.increment(count); - return { - [Symbol.dispose]() { - AtomicValue.decrement(count); - } - }; -} \ No newline at end of file diff --git a/src/compiler/threading/spinWait.ts b/src/compiler/threading/spinWait.ts index 0315f3ed68c..304349665d1 100644 --- a/src/compiler/threading/spinWait.ts +++ b/src/compiler/threading/spinWait.ts @@ -11,21 +11,42 @@ export class SpinWait { } spinOnce() { - const count = this._count; - if (cpuCount > 0 && count < 10) { - for (let i = 0; i < count; i++) { - // busy loop, do nothing - } - } - else if ((count - 10) % 20 === 19) { - sleep(1); - } - // else if ((count - 10) % 5 === 4) { - // sleep(1); - // } - else { - sleep(0); - } - this._count = (count + 1) >>> 0; + this._count = spin(this._count); } } + +/** + * Periodically puts the current thread to sleep in an effort to reduce lock contention. + * @param currentCount an unsigned 32-bit integer value used to determine the current spin count. + * @returns the new spin count. + * @example + * ```ts + * let spinCounter = 0; + * while (!trySomeLockFreeOperation()) { + * spinCounter = spin(spinCounter); + * } + * ``` + */ +export function spin(currentCount: number) { + currentCount >>>= 0; + if (cpuCount > 0 && currentCount < 10) { + for (let i = 0; i < currentCount; i++) { + // busy loop, do nothing + } + } + else if ((currentCount - 10) % 20 === 19) { + sleep(2); + } + else if ((currentCount - 10) % 5 === 4) { + sleep(1); + } + else { + sleep(0); + } + // on uint32 overflow, reset to 10 so that we do not retry the busy loop + currentCount = (currentCount + 1) >>> 0; + if (currentCount === 0) { + currentCount = 10; + } + return currentCount; +} \ No newline at end of file diff --git a/src/testRunner/unittests/sharing/concurrentMap.ts b/src/testRunner/unittests/sharing/concurrentMap.ts index f98b87d0787..20023b346d4 100644 --- a/src/testRunner/unittests/sharing/concurrentMap.ts +++ b/src/testRunner/unittests/sharing/concurrentMap.ts @@ -1,65 +1,65 @@ -import { ConcurrentMap, wrapStruct } from "../../_namespaces/ts"; +import { ConcurrentMap, ConcurrentMapWrapper } from "../../_namespaces/ts"; describe("unittests:: sharing:: concurrentMap", () => { it("round trip", () => { - const map = wrapStruct(new ConcurrentMap(), ConcurrentMap); - assert.equal(map.size(), 0); + const map = new ConcurrentMapWrapper(new ConcurrentMap()); + assert.equal(map.size, 0); assert.isFalse(map.has(1)); map.set(1, "a"); - assert.equal(map.size(), 1); + assert.equal(map.size, 1); assert.isTrue(map.has(1)); map.delete(1); - assert.equal(map.size(), 0); + assert.equal(map.size, 0); assert.isFalse(map.has(1)); }); it("large inserts", () => { - const map = wrapStruct(new ConcurrentMap(), ConcurrentMap); + const map = new ConcurrentMapWrapper(new ConcurrentMap()); for (let i = 0; i < 10_000; i++) { map.insert(i, i); } - assert.equal(map.size(), 10_000); + assert.equal(map.size, 10_000); }); it("large deletes", () => { - const map = wrapStruct(new ConcurrentMap(), ConcurrentMap); + const map = new ConcurrentMapWrapper(new ConcurrentMap()); for (let i = 0; i < 10_000; i++) { map.insert(i, i); } for (let i = 0; i < 10_000; i++) { map.delete(i); } - assert.equal(map.size(), 0); + assert.equal(map.size, 0); }); describe("exchange", () => { it("sets key to value if missing", () => { - const map = wrapStruct(new ConcurrentMap(), ConcurrentMap); + const map = new ConcurrentMapWrapper(new ConcurrentMap()); map.exchange(1, 2); - assert.equal(map.size(), 1); + assert.equal(map.size, 1); assert.isTrue(map.has(1)); assert.equal(map.get(1), 2); }); it("overwrites key if present and value is not undefined", () => { - const map = wrapStruct(new ConcurrentMap(), ConcurrentMap); + const map = new ConcurrentMapWrapper(new ConcurrentMap()); map.set(1, 2); map.exchange(1, 3); - assert.equal(map.size(), 1); + assert.equal(map.size, 1); assert.isTrue(map.has(1)); assert.equal(map.get(1), 3); }); it("deletes key if present and value is undefined", () => { - const map = wrapStruct(new ConcurrentMap(), ConcurrentMap); + const map = new ConcurrentMapWrapper(new ConcurrentMap()); map.set(1, 2); map.exchange(1, /*value*/ undefined); - assert.equal(map.size(), 0); + assert.equal(map.size, 0); assert.isFalse(map.has(1)); assert.equal(map.get(1), /*expected*/ undefined); }); it("returns undefined if key was not present", () => { - const map = wrapStruct(new ConcurrentMap(), ConcurrentMap); + const map = new ConcurrentMapWrapper(new ConcurrentMap()); const result = map.exchange(1, 2); assert.isUndefined(result); }); it("returns previous value if key was present", () => { - const map = wrapStruct(new ConcurrentMap(), ConcurrentMap); + const map = new ConcurrentMapWrapper(new ConcurrentMap()); map.set(1, 2); const result1 = map.exchange(1, 3); const result2 = map.exchange(1, /*value*/ undefined); @@ -69,64 +69,64 @@ describe("unittests:: sharing:: concurrentMap", () => { }); describe("compareExchange", () => { it("sets key to value if expecting undefined and key not present", () => { - const map = wrapStruct(new ConcurrentMap(), ConcurrentMap); + const map = new ConcurrentMapWrapper(new ConcurrentMap()); map.compareExchange(1, /*expectedValue*/ undefined, 2); - assert.equal(map.size(), 1); + assert.equal(map.size, 1); assert.isTrue(map.has(1)); assert.equal(map.get(1), 2); }); it("does not set key to value if expecting undefined and key is present", () => { - const map = wrapStruct(new ConcurrentMap(), ConcurrentMap); + const map = new ConcurrentMapWrapper(new ConcurrentMap()); map.set(1, 2); map.compareExchange(1, /*expectedValue*/ undefined, 3); - assert.equal(map.size(), 1); + assert.equal(map.size, 1); assert.isTrue(map.has(1)); assert.equal(map.get(1), 2); }); it("overwrites key if present, expected value matches, and replacement value is not undefined", () => { - const map = wrapStruct(new ConcurrentMap(), ConcurrentMap); + const map = new ConcurrentMapWrapper(new ConcurrentMap()); map.set(1, 2); map.compareExchange(1, 2, 3); - assert.equal(map.size(), 1); + assert.equal(map.size, 1); assert.isTrue(map.has(1)); assert.equal(map.get(1), 3); }); it("does not overwrite key if present, expected value does not match, and replacement value is not undefined", () => { - const map = wrapStruct(new ConcurrentMap(), ConcurrentMap); + const map = new ConcurrentMapWrapper(new ConcurrentMap()); map.set(1, 2); map.compareExchange(1, 4, 3); - assert.equal(map.size(), 1); + assert.equal(map.size, 1); assert.isTrue(map.has(1)); assert.equal(map.get(1), 2); }); it("deletes key if present, expected value matches, and value is undefined", () => { - const map = wrapStruct(new ConcurrentMap(), ConcurrentMap); + const map = new ConcurrentMapWrapper(new ConcurrentMap()); map.set(1, 2); map.compareExchange(1, 2, /*replacementValue*/ undefined); - assert.equal(map.size(), 0); + assert.equal(map.size, 0); assert.isFalse(map.has(1)); assert.equal(map.get(1), /*expected*/ undefined); }); it("does not delete key if present, expected value does not match, and value is undefined", () => { - const map = wrapStruct(new ConcurrentMap(), ConcurrentMap); + const map = new ConcurrentMapWrapper(new ConcurrentMap()); map.set(1, 2); map.compareExchange(1, 3, /*replacementValue*/ undefined); - assert.equal(map.size(), 1); + assert.equal(map.size, 1); assert.isTrue(map.has(1)); assert.equal(map.get(1), 2); }); it("returns undefined if key was not present and expected value is undefined", () => { - const map = wrapStruct(new ConcurrentMap(), ConcurrentMap); + const map = new ConcurrentMapWrapper(new ConcurrentMap()); const result = map.compareExchange(1, /*expectedValue*/ undefined, 2); assert.isUndefined(result); }); it("returns undefined if key was not present and expected value was not undefined", () => { - const map = wrapStruct(new ConcurrentMap(), ConcurrentMap); + const map = new ConcurrentMapWrapper(new ConcurrentMap()); const result = map.compareExchange(1, 2, 3); assert.isUndefined(result); }); it("returns previous value if key was present and expected value matches", () => { - const map = wrapStruct(new ConcurrentMap(), ConcurrentMap); + const map = new ConcurrentMapWrapper(new ConcurrentMap()); map.set(1, 2); const result1 = map.compareExchange(1, 2, 3); const result2 = map.compareExchange(1, 3, /*value*/ undefined); @@ -134,7 +134,7 @@ describe("unittests:: sharing:: concurrentMap", () => { assert.equal(result2, 3); }); it("returns previous value if key was present and expected value did not match", () => { - const map = wrapStruct(new ConcurrentMap(), ConcurrentMap); + const map = new ConcurrentMapWrapper(new ConcurrentMap()); map.set(1, 2); const result1 = map.compareExchange(1, 3, 4); const result2 = map.compareExchange(1, 4, /*value*/ undefined);