cleanup unused coordination primitives

This commit is contained in:
Ron Buckton 2023-10-06 17:42:51 -04:00
parent f209dfcedd
commit f563ff252e
No known key found for this signature in database
GPG Key ID: 9ADA0DFD36502AB9
13 changed files with 213 additions and 458 deletions

View File

@ -16,6 +16,7 @@ export * from "../sharing/structs/shareable";
export * from "../sharing/structs/sharedStruct";
export * from "../sharing/structs/taggedStruct";
export * from "../sharing/structs/wrapper";
export * from "../sharing/collections/deque";
export * from "../sharing/collections/hashData";
export * from "../sharing/collections/sharedLinkedList";
export * from "../sharing/collections/concurrentMap";
@ -31,9 +32,7 @@ export * from "../sharing/sharedObjectAllocator";
export * from "../sharing/sharedParserState";
export * from "../sharing/sharedSymbol";
export * from "../threading/condition";
export * from "../threading/countdownEvent";
export * from "../threading/lockable";
export * from "../threading/manualResetEvent";
export * from "../threading/mutex";
export * from "../threading/scopedLock";
export * from "../threading/sharedLockable";

View File

@ -132,8 +132,9 @@ export function zipWith<T, U, V>(sequenceA: Sequence<T>, sequenceB: Sequence<U>,
const result: V[] = [];
const arrayA = getArrayLike(sequenceA);
const arrayB = getArrayLike(sequenceB);
Debug.assertEqual(arrayA.length, arrayB.length);
for (let i = 0; i < arrayA.length; i++) {
const length = arrayA.length;
Debug.assertEqual(length, arrayB.length);
for (let i = 0; i < length; i++) {
result.push(callback(arrayA[i], arrayB[i], i));
}
return result;
@ -289,7 +290,7 @@ export function contains<T>(sequence: Sequence<T> | undefined, value: T, equalit
export function arraysEqual<T>(sequenceA: Sequence<T>, sequenceB: Sequence<T>, equalityComparer: EqualityComparer<T> = equateValues): boolean {
const arrayA = getArrayLike(sequenceA);
const arrayB = getArrayLike(sequenceB);
return arrayA.length === arrayB.length && Array.prototype.every.call(arrayA, (x: T, i: number) => equalityComparer(x, arrayB[i]));
return arrayA.length === arrayB.length && Array.prototype.every.call(arrayA, (x, i) => equalityComparer(x, arrayB[i]));
}
/** @internal */
@ -340,13 +341,22 @@ export function filter<T, U extends T>(array: readonly T[] | undefined, f: (x: T
/** @internal */
export function filter<T, U extends T>(array: readonly T[] | undefined, f: (x: T) => boolean): readonly T[] | undefined;
/** @internal */
export function filter<T>(array: readonly T[] | undefined, f: (x: T) => boolean): readonly T[] | undefined {
export function filter<T extends Shareable, U extends T>(array: SharedArray<T>, f: (x: T) => x is U): SharedArray<U>;
/** @internal */
export function filter<T extends Shareable>(array: SharedArray<T>, f: (x: T) => boolean): SharedArray<T>;
/** @internal */
export function filter<T extends SharedNode, U extends T>(array: SharedNodeArray<T>, f: (x: T) => x is U): SharedArray<U> | SharedNodeArray<U>;
/** @internal */
export function filter<T extends SharedNode>(array: SharedNodeArray<T>, f: (x: T) => boolean): SharedArray<T> | SharedNodeArray<T>;
/** @internal */
export function filter<T>(sequence: Sequence<T> | undefined, f: (x: T) => boolean) {
const array = getArrayLike(sequence);
if (array) {
const len = array.length;
let i = 0;
while (i < len && f(array[i])) i++;
if (i < len) {
const result = array.slice(0, i);
const result = Array.prototype.slice.call(array, 0, i);
i++;
while (i < len) {
const item = array[i];
@ -355,10 +365,10 @@ export function filter<T>(array: readonly T[] | undefined, f: (x: T) => boolean)
}
i++;
}
return result;
return isShareableNonPrimitive(sequence) ? sharedArrayFrom(result) : result;
}
}
return array;
return sequence;
}
/** @internal */
@ -454,7 +464,6 @@ export function flatten<T>(array: T[][] | readonly (T | readonly T[] | undefined
return result;
}
/**
* Maps an array. If the mapped value is an array, it is spread into the result.
*
@ -1479,6 +1488,26 @@ export function arrayFrom<T, U>(iterator: Iterable<T>, map?: (t: T) => U): (T |
return result;
}
/**
* Shims `Array.from`.
*
* @internal
*/
export function sharedArrayFrom<T, U extends Shareable>(iterator: Iterable<T>, map: (t: T) => U): SharedArray<U>;
/** @internal */
export function sharedArrayFrom<T extends Shareable>(iterator: Iterable<T>): SharedArray<T>;
/** @internal */
export function sharedArrayFrom<T extends Shareable>(iterator: Iterable<T> | readonly T[] | SharedArray<T>, map?: (t: T) => T): SharedArray<T> {
const array: ArrayLike<T> = isArray(iterator) || isSharedArray(iterator) ? iterator : arrayFrom(iterator);
const length = array.length;
const sharedArray = new SharedArray<T>(length);
for (let i = 0; i < length; i++) {
const value = array[i];
sharedArray[i] = map ? map(value) : value;
}
return sharedArray;
}
/** @internal */
export function assign<T extends object>(t: T, ...args: (T | undefined)[]) {
for (const arg of args) {

View File

@ -7,6 +7,7 @@ import {
compareValues,
EmitFlags,
every,
FileIncludeKind,
FlowFlags,
FlowLabel,
FlowNode,
@ -519,6 +520,10 @@ export namespace Debug {
return formatEnum(facts, (ts as any).TypeFacts, /*isFlags*/ true);
}
export function formatFileIncludeKind(kind: FileIncludeKind): string {
return formatEnum(kind, (ts as any).FileIncludeKind, /*isflags*/ false);
}
let isDebugInfoEnabled = false;
let flowNodeProto: FlowNodeBase | undefined;

View File

@ -292,6 +292,8 @@ import {
setParentRecursive,
setResolvedModule,
setResolvedTypeReferenceDirective,
Shared,
SharedStructBase,
shouldResolveJsRequire,
skipTrivia,
skipTypeChecking,
@ -420,12 +422,11 @@ export function makeCompilerHostParallel(
Debug.assert(!host.threadPool && !host.requestSourceFile, "Compiler host may already be parallelized.");
const sharedEntryMap = new Map<SharedSourceFileEntry, SourceFile | false>();
const parserState = new SharedParserState();
const parserState = new SharedParserState(threadPool.poolSize);
const savedGetSourceFile = host.getSourceFile;
host.threadPool = threadPool;
host.requestSourceFile = (fileName, languageVersionOrOptions, shouldCreateNewSourceFile) => {
using _measure = performance.measureActivity("Parallel: Request Source File", "beforeParserPausedRequest", "afterParserPausedRequest");
if (!ConcurrentMap.has(parserState.files, fileName)) {
const entry = new SharedSourceFileEntry(
parserState,
@ -440,42 +441,11 @@ export function makeCompilerHostParallel(
threadPool.queueWorkItem("Program.requestSourceFile", entry);
}
}
// // quick check before allocating an entry
// {
// using _ = new SharedLock(parserState.sharedMutex);
// if (SharedMap.has(parserState.files, fileName)) {
// return;
// }
// }
// const entry = new SharedSourceFileEntry(
// parserState,
// !!host.createHash,
// !!setParentNodes,
// fileName,
// typeof languageVersionOrOptions === "object" ? languageVersionOrOptions.languageVersion : languageVersionOrOptions,
// typeof languageVersionOrOptions === "object" ? languageVersionOrOptions.impliedNodeFormat : undefined,
// shouldCreateNewSourceFile);
// using _ = new UniqueLock(parserState.sharedMutex);
// if (!SharedMap.has(parserState.files, fileName)) {
// SharedMap.set(parserState.files, fileName, entry);
// // we inserted the entry, queue a task to parse it
// threadPool.queueWorkItem("Program.requestSourceFile", entry);
// }
};
host.getSourceFile = (fileName, languageVersionOrOptions, onError, shouldCreateNewSourceFile) => {
using _ = performance.measureActivity("Parallel: Get Source File", "beforeParserPausedRead", "afterParserPausedRead");
const sharedFileEntry = ConcurrentMap.get(parserState.files, fileName);
// let sharedFileEntry: SharedSourceFileEntry | undefined;
// {
// using _ = new SharedLock(parserState.sharedMutex);
// sharedFileEntry = SharedMap.get(parserState.files, fileName);
// }
if (sharedFileEntry) {
let file = sharedEntryMap.get(sharedFileEntry);
if (file !== undefined) {
@ -3706,7 +3676,7 @@ export function createProgram(rootNamesOrOptions: readonly string[] | CreateProg
using _ = tracing?.traceActivity(tracing.Phase.Program, "findSourceFile", {
fileName,
isDefaultLib: isDefaultLib || undefined,
fileIncludeKind: (FileIncludeKind as any)[reason.kind],
fileIncludeKind: Debug.formatFileIncludeKind(reason.kind),
});
const path = toPath(fileName);
@ -4039,18 +4009,7 @@ export function createProgram(rootNamesOrOptions: readonly string[] | CreateProg
reason: FileIncludeReason,
currentNodeModulesDepth: number
) {
tracing?.push(tracing.Phase.Program, "processTypeReferenceDirective", { directive: typeReferenceDirective, hasResolved: !!resolution.resolvedTypeReferenceDirective, refKind: reason.kind, refPath: isReferencedFile(reason) ? reason.file : undefined });
yield* processTypeReferenceDirectiveWorker(typeReferenceDirective, mode, resolution, reason, currentNodeModulesDepth);
tracing?.pop();
}
function* processTypeReferenceDirectiveWorker(
typeReferenceDirective: string,
mode: ResolutionMode,
resolution: ResolvedTypeReferenceDirectiveWithFailedLookupLocations,
reason: FileIncludeReason,
currentNodeModulesDepth: number
) {
using _ = tracing?.traceActivity(tracing.Phase.Program, "processTypeReferenceDirective", { directive: typeReferenceDirective, hasResolved: !!resolution.resolvedTypeReferenceDirective, refKind: reason.kind, refPath: isReferencedFile(reason) ? reason.file : undefined });
let hasRecordedResolutionDiagnostics = false;
function* recordResolutionDiagnosticsIfNeeded(needsYield = true) {
if (!hasRecordedResolutionDiagnostics) {

View File

@ -1,13 +1,12 @@
import { iterateValues } from "../../core";
import { Debug } from "../../debug";
import { sys } from "../../sys";
import { AtomicValue } from "../../threading/atomicValue";
import { Mutex } from "../../threading/mutex";
import { ScopedLock } from "../../threading/scopedLock";
import { UniqueLock } from "../../threading/uniqueLock";
import { AtomicValue } from "../atomicValue";
import { Shared, SharedStructBase } from "../structs/sharedStruct";
import { Tag, Tagged } from "../structs/taggedStruct";
import { StructWrapperTypes } from "../structs/wrapper";
import { generateHashSeed, identityHash } from "./hash";
import { getPrime } from "./hashData";
@ -62,35 +61,20 @@ type WILDCARD = typeof WILDCARD;
type EXIST = typeof EXIST;
type NOT_EXIST = typeof NOT_EXIST;
/** @internal */
/**
* A concurrent Map-like object. Based on https://github.com/dotnet/runtime/blob/main/src/libraries/System.Collections.Concurrent/src/System/Collections/Concurrent/ConcurrentDictionary.cs
* @internal
*/
@Shared()
export class ConcurrentMap<K extends NonNullable<Shareable>, V extends NonNullable<Shareable>> extends Tagged(SharedStructBase, Tag.ConcurrentMap) {
declare [StructWrapperTypes]: [
[typeof ConcurrentMap, {
size(): number;
has(key: K): boolean;
get(key: K): V | undefined;
set(key: K, value: V): void;
delete(key: K, expectedValue?: V): V | undefined;
insert(key: K, value: V): boolean;
replace(key: K, expectedValue: V, replacementValue: V): V | undefined;
exchange(key: K, value: V | undefined): V | undefined;
compareExchange(key: K, expectedValue: V | undefined, replacementValue: V | undefined): V | undefined;
clear(): void;
keys(): IterableIterator<K>;
values(): IterableIterator<V>;
entries(): IterableIterator<[K, V]>;
}]
];
@Shared() private readonly _tables: AtomicValue<Tables<K, V>>;
@Shared() private _budget: number;
@Shared() private readonly _growLockArray: boolean;
constructor();
constructor(items: ConcurrentMap<K, V> | Iterable<[K, V]>);
constructor(concurrencyLevel?: number, capacity?: number);
constructor(concurrencyLevel: number, items: ConcurrentMap<K, V> | Iterable<[K, V]>);
constructor(concurrencyLevel: number, capacity: number);
constructor(concurrencyLevelOrItems?: number | ConcurrentMap<K, V> | Iterable<[K, V]>, capacityOrItems?: number | ConcurrentMap<K, V> | Iterable<[K, V]>) {
let concurrencyLevel: number | undefined;
let capacity: number | undefined;
@ -514,3 +498,30 @@ function isLockFree(value: Shareable) {
if (typeof value === "number") return isFinite(value) && (value === (value >>> 0) || value === (value >> 0));
return true;
}
/**
* A non-shared wrapper for {@link ConcurrentMap} objects.
* @internal
*/
export class ConcurrentMapWrapper<K extends NonNullable<Shareable>, V extends NonNullable<Shareable>> {
private self: ConcurrentMap<K, V>;
constructor(map: ConcurrentMap<K, V>) {
this.self = map;
}
get size(): number { return ConcurrentMap.size(this.self); }
has(key: K): boolean { return ConcurrentMap.has(this.self, key); }
get(key: K): V | undefined { return ConcurrentMap.get(this.self, key); }
set(key: K, value: V): this { return ConcurrentMap.set(this.self, key, value), this; }
delete(key: K, expectedValue?: V): V | undefined { return ConcurrentMap.delete(this.self, key, expectedValue); }
insert(key: K, value: V): boolean { return ConcurrentMap.insert(this.self, key, value); }
replace(key: K, expectedValue: V, replacementValue: V): boolean { return ConcurrentMap.replace(this.self, key, expectedValue, replacementValue); }
exchange(key: K, value: V | undefined): V | undefined { return ConcurrentMap.exchange(this.self, key, value); }
compareExchange(key: K, expectedValue: V | undefined, replacementValue: V | undefined): V | undefined { return ConcurrentMap.compareExchange(this.self, key, expectedValue, replacementValue); }
clear(): void { return ConcurrentMap.clear(this.self); }
keys(): IterableIterator<K> { return ConcurrentMap.keys(this.self); }
values(): IterableIterator<V> { return ConcurrentMap.values(this.self); }
entries(): IterableIterator<[K, V]> { return ConcurrentMap.entries(this.self); }
[Symbol.iterator]() { return this.entries(); }
}

View File

@ -8,9 +8,11 @@ import { Shared, SharedStructBase } from "./structs/sharedStruct";
/** @internal */
@Shared()
export class SharedParserState extends SharedStructBase {
// @Shared() sharedMutex = new SharedMutex();
// @Shared() files = new SharedMap<string, SharedSourceFileEntry>();
@Shared() files = new ConcurrentMap<string, SharedSourceFileEntry>();
@Shared() files: ConcurrentMap<string, SharedSourceFileEntry>;
constructor(concurrencyLevel?: number) {
super();
this.files = new ConcurrentMap<string, SharedSourceFileEntry>(concurrencyLevel);
}
}
/** @internal */

View File

@ -1,65 +0,0 @@
import { Debug } from "../../debug";
/**
* Used in a `declare` instance field of a struct type to override the type we use for a wrapper object generated for
* the struct. We do this because we cannot properly infer correct generic instantiations from generic static methods.
* This declaration intentionally does not produce an actual value as it should only be used in an ambient context.
* @internal
*/
export declare const StructWrapperTypes: unique symbol;
/** @internal */
export type StructWrapper<TInstance extends SharedStruct, TStatics extends object> =
TInstance extends { [StructWrapperTypes]: infer T extends [any, any][] } ?
T[number] extends [TStatics, infer U] ? U :
SynthesizedStructWrapper<TInstance, TStatics> :
SynthesizedStructWrapper<TInstance, TStatics>;
type SynthesizedStructInstanceFields<TInstance extends SharedStruct> = {
[P in keyof TInstance & string as Exclude<P, "__tag__" | "__hash__">]: TInstance[P];
};
type SynthesizedStructInstanceMethods<TInstance extends SharedStruct, TStatics extends object> = {
[P in keyof TStatics & string as TStatics[P] extends (self: TInstance, ...args: any) => any ? Exclude<P, `_${string}`> : never]:
TStatics[P] extends (self: TInstance, ...args: infer A) => infer R ? (...args: A) => R : never;
};
type SynthesizedStructWrapper<TInstance extends SharedStruct, TStatics extends object> =
& SynthesizedStructInstanceFields<TInstance>
& SynthesizedStructInstanceMethods<TInstance, TStatics>
;
/**
* Generates a simple, shallow proxy for a shared struct that treats static methods as instance methods with the first
* parameter bound to the instance.
* @internal
*/
export function wrapStruct<TInstance extends SharedStruct, TStatics extends object>(instance: TInstance, statics: TStatics): StructWrapper<TInstance, TStatics> {
const wrapper = {} as any;
for (const key of Reflect.ownKeys(instance) as Iterable<keyof TInstance>) {
if (typeof key !== "string") continue;
if (key === "__tag__" || key == "__hash__") continue;
Object.defineProperty(wrapper, key, {
enumerable: true,
configurable: false,
get: () => instance[key],
set: (value: TInstance[typeof key]) => instance[key] = value
});
}
for (const key of Reflect.ownKeys(statics) as Iterable<keyof TStatics>) {
if (typeof key !== "string") continue;
if (key.startsWith("_")) continue;
const value = statics[key];
if (typeof value !== "function") continue;
if (Object.prototype.hasOwnProperty.call(value, key)) Debug.fail(`static method name '${key}' conflicts with same named instance field`);
Object.defineProperty(wrapper, key, {
enumerable: false,
configurable: false,
writable: false,
value: value.bind(/*thisArg*/ undefined, instance)
});
}
return wrapper as StructWrapper<TInstance, TStatics>;
}

View File

@ -1,8 +1,17 @@
import { Shared, SharedStructBase } from "./structs/sharedStruct";
import { spin } from "./spinWait";
import { Shared, SharedStructBase } from "../sharing/structs/sharedStruct";
/** @internal */
/**
* Atomically read and write a shared value. While this isn't strictly necessary as `Atomics` methods can be used
* directly, it provides a convenient mechanism for updating fields marked with the TypeScript `private` keyword.
* @internal
*/
@Shared()
export class AtomicValue<T extends Shareable> extends SharedStructBase {
/**
* The held value. It is not recommended to read or write from this field directly as such operations are neither
* atomic nor sequentially consistent. Instead, you should use {@link AtomicValue.load} and {@link AtomicValue.store}.
*/
@Shared() unsafeValue: T;
constructor(value: T) {
@ -10,18 +19,36 @@ export class AtomicValue<T extends Shareable> extends SharedStructBase {
this.unsafeValue = value;
}
/**
* Atomically loads and returns the underlying value. This operation is guaranteed to be sequentially consistent
* across multiple threads/agents.
*/
static load<T extends Shareable>(self: AtomicValue<T>) {
return Atomics.load(self, "unsafeValue");
}
/**
* Atomically replaces the underlying value with the provided value. This operation is guaranteed to be sequentially
* consistent across multiple threads/agents.
*/
static store<T extends Shareable>(self: AtomicValue<T>, value: T) {
return Atomics.store(self, "unsafeValue", value);
Atomics.store(self, "unsafeValue", value);
}
/**
* Atomically replaces the underlying value with the provided value, returning the previous value as a
* read-modify-write operation. This operation is guaranteed to be sequentially consistent across multiple
* threads/agents.
*/
static exchange<T extends Shareable>(self: AtomicValue<T>, value: T) {
return Atomics.exchange(self, "unsafeValue", value);
}
/**
* Atomically replaces the underlying value with the provided replacement value if the underling value and expected
* value are the same, returning the previous value as a read-modify-write operation. This operation is guaranteed
* to be sequentially consistent across multiple threads/agents.
*/
static compareExchange<T extends Shareable>(self: AtomicValue<T>, expectedValue: T, replacementValue: T) {
return Atomics.compareExchange(self, "unsafeValue", expectedValue, replacementValue);
}
@ -30,33 +57,56 @@ export class AtomicValue<T extends Shareable> extends SharedStructBase {
return expectedValue === AtomicValue.compareExchange(self, expectedValue, replacementValue);
}
/**
* Performs 32-bit signed integer addition via an atomic read-modify-write operation.
* @returns the value prior to addition.
*/
static add(self: AtomicValue<number>, value: number) {
let spinCounter = 0;
let currentValue = AtomicValue.load(self);
while (!AtomicValue.compareAndSet(self, currentValue, (currentValue + value) >> 0)) {
while (currentValue !== AtomicValue.compareExchange(self, currentValue, (currentValue + value) >> 0)) {
spinCounter = spin(spinCounter);
currentValue = AtomicValue.load(self);
}
return currentValue;
}
/**
* Performs 32-bit signed integer subtraction via an atomic read-modify-write operation.
* @returns the value prior to subtraction.
*/
static sub(self: AtomicValue<number>, value: number) {
let spinCounter = 0;
let currentValue = AtomicValue.load(self);
while (!AtomicValue.compareAndSet(self, currentValue, (currentValue - value) >> 0)) {
while (currentValue !== AtomicValue.compareExchange(self, currentValue, (currentValue - value) >> 0)) {
spinCounter = spin(spinCounter);
currentValue = AtomicValue.load(self);
}
return currentValue;
}
/**
* Atomically increments a 32-bit signed integer value.
* @returns the value prior to subtraction.
*/
static increment(self: AtomicValue<number>) {
return AtomicValue.add(self, 1);
}
/**
* Atomically decrements a 32-bit signed integer value.
* @returns the value prior to subtraction.
*/
static decrement(self: AtomicValue<number>) {
return AtomicValue.sub(self, 1);
}
}
/** @internal */
export class AtomicRef<T extends Shareable> {
/**
* A non-shared wrapper for an {@link AtomicValue}.
* @internal
*/
export class AtomicRef<T extends Shareable> implements Disposable {
private _value: AtomicValue<T> | undefined;
constructor(value: AtomicValue<T> | undefined) {

View File

@ -1,118 +0,0 @@
import { Condition, Debug, isTaggedStruct, Mutex, Shared, SharedStructBase, Tag, Tagged, UniqueLock } from "../_namespaces/ts";
const MAX_INT32 = 2 ** 31 - 1;
/** @internal */
@Shared()
export class CountdownEvent extends Tagged(SharedStructBase, Tag.CountdownEvent) {
@Shared() private _mutex: Mutex | undefined = new Mutex();
@Shared() private _condition: Condition | undefined = new Condition();
@Shared() private _signaled = false;
@Shared() private _initialCount: number;
@Shared() private _remainingCount: number = 0;
constructor(initialCount = 0) {
super();
initialCount |= 0;
Debug.assert(initialCount >= 0, "initialCount out of range");
this._initialCount = initialCount;
this._remainingCount = initialCount;
this._signaled = initialCount === 0;
}
static initialCount(self: CountdownEvent) {
return self._initialCount;
}
static remainingCount(self: CountdownEvent) {
return self._remainingCount;
}
static isSet(self: CountdownEvent) {
return self._remainingCount === 0;
}
static add(self: CountdownEvent, count?: number) {
Debug.assert(CountdownEvent.tryAdd(self, count), "event is already signaled");
}
static tryAdd(self: CountdownEvent, count = 1) {
count |= 0;
Debug.assert(count >= 1, "count out of range");
Debug.assert(self._mutex, "object disposed");
using _ = new UniqueLock(self._mutex);
if (self._remainingCount === 0) return false;
Debug.assert(self._remainingCount <= MAX_INT32 - count, "count out of range");
self._remainingCount += count;
return true;
}
static reset(self: CountdownEvent, count?: number) {
if (count !== undefined) {
count |= 0;
Debug.assert(count >= 0, "count out of range");
}
Debug.assert(self._mutex && self._condition, "object disposed");
using _ = new UniqueLock(self._mutex);
count ??= self._initialCount;
self._remainingCount = count;
self._initialCount = count;
if (count === 0) {
if (!self._signaled) {
self._signaled = true;
Condition.notify(self._condition);
}
}
else {
self._signaled = false;
}
}
static signal(self: CountdownEvent, count = 1) {
count |= 0;
Debug.assert(count >= 1, "count out of range");
Debug.assert(self._mutex && self._condition, "object disposed");
using _ = new UniqueLock(self._mutex);
Debug.assert(self._remainingCount >= count, "count out of range");
self._remainingCount = self._remainingCount - count;
if (self._remainingCount === 0) {
if (!self._signaled) {
self._signaled = true;
Condition.notify(self._condition);
}
return true;
}
return false;
}
static wait(self: CountdownEvent, timeout?: number) {
if (timeout !== undefined) {
timeout = Math.max(0, timeout | 0);
}
Debug.assert(self._mutex && self._condition, "object disposed");
if (CountdownEvent.isSet(self)) {
return true;
}
using lck = new UniqueLock(self._mutex);
if (timeout !== undefined) {
return Condition.waitFor(self._condition, lck, timeout) !== "timed-out";
}
else {
Condition.wait(self._condition, lck);
return true;
}
}
static close(self: CountdownEvent): void {
self._mutex = undefined;
self._condition = undefined;
}
static [Symbol.hasInstance](value: unknown): value is CountdownEvent {
return isTaggedStruct(value, Tag.CountdownEvent);
}
}

View File

@ -1,45 +0,0 @@
import { Shared, SharedStructBase } from "../sharing/structs/sharedStruct";
import { isTaggedStruct, Tag, Tagged } from "../sharing/structs/taggedStruct";
/** @internal */
@Shared()
export class ManualResetEvent extends Tagged(SharedStructBase, Tag.ManualResetEvent) {
@Shared() private mutex = new Atomics.Mutex();
@Shared() private condition = new Atomics.Condition();
@Shared() private signaled = false;
static isSet(self: ManualResetEvent) {
return self.signaled;
}
static set(self: ManualResetEvent) {
let result = false;
Atomics.Mutex.tryLock(self.mutex, () => {
if (!self.signaled) {
self.signaled = true;
Atomics.Condition.notify(self.condition);
result = true;
}
});
return result;
}
static reset(self: ManualResetEvent) {
let result = false;
Atomics.Mutex.tryLock(self.mutex, () => {
if (self.signaled) {
self.signaled = false;
result = true;
}
});
return result;
}
static wait(self: ManualResetEvent, timeout?: number) {
return Atomics.Mutex.lock(self.mutex, () => Atomics.Condition.wait(self.condition, self.mutex, timeout));
}
static [Symbol.hasInstance](value: unknown): value is ManualResetEvent {
return isTaggedStruct(value, Tag.ManualResetEvent);
}
}

View File

@ -1,93 +0,0 @@
import { Condition, Debug, Mutex, Shared, SharedStructBase, Tag, Tagged, UniqueLock } from "../_namespaces/ts";
import { AtomicValue } from "../sharing/atomicValue";
/** @internal */
@Shared()
export class Semaphore extends Tagged(SharedStructBase, Tag.Semaphore) {
@Shared() private readonly _maxCount: number;
@Shared() private readonly _currentCount: AtomicValue<number>;
@Shared() private readonly _waitCount = new AtomicValue<number>(0);
@Shared() private readonly _mutex = new Mutex();
@Shared() private readonly _condition = new Condition();
constructor(initialCount: number, maxCount: number = 2 ** 31 - 1) {
Debug.assert(initialCount === (initialCount | 0) && initialCount >= 0);
Debug.assert(maxCount >= 0 && maxCount >= initialCount);
super();
this._maxCount = maxCount;
this._currentCount = new AtomicValue(initialCount);
}
static count(self: Semaphore) {
return AtomicValue.load(self._currentCount);
}
static tryAcquire(self: Semaphore, timeout?: number) {
if (timeout !== undefined) {
if (isNaN(timeout) || isFinite(timeout)) timeout |= 0;
if (timeout < 0) timeout = 0;
}
// first, try to acquire a count lock-free
let count = AtomicValue.load(self._currentCount);
if (count > 0 && count === (count = AtomicValue.compareExchange(self._currentCount, count, count - 1))) {
return true;
}
if (timeout === undefined) {
return false;
}
const start = Date.now();
let remaining: number;
// if that fails, take a lock and wait for a spot to open up
using lck = new UniqueLock(self._mutex);
using _waiter = waitScope(self._waitCount);
do {
remaining = timeout - (Date.now() - start);
if (count <= 0) {
if (Condition.waitFor(self._condition, lck, remaining) === "timed-out") {
return false;
}
count = AtomicValue.load(self._currentCount);
}
else if (count === (count = AtomicValue.compareExchange(self._currentCount, count, count - 1))) {
return true;
}
}
while (remaining >= 0);
return false;
}
static acquire(self: Semaphore) {
Semaphore.tryAcquire(self, Infinity);
}
static release(self: Semaphore, count = 1) {
let previousCount = AtomicValue.load(self._currentCount);
let nextCount: number;
let waitCount: number;
do {
Debug.assert(self._maxCount - previousCount >= count);
nextCount = previousCount + count;
waitCount = AtomicValue.load(self._waitCount);
}
while (previousCount !== (previousCount = AtomicValue.compareExchange(self._currentCount, previousCount, nextCount)));
if (nextCount === 1 && waitCount === 1) {
Condition.notify(self._condition, 1);
}
else if (waitCount > 1) {
Condition.notify(self._condition);
}
return previousCount;
}
}
function waitScope(count: AtomicValue<number>) {
AtomicValue.increment(count);
return {
[Symbol.dispose]() {
AtomicValue.decrement(count);
}
};
}

View File

@ -11,21 +11,42 @@ export class SpinWait {
}
spinOnce() {
const count = this._count;
if (cpuCount > 0 && count < 10) {
for (let i = 0; i < count; i++) {
// busy loop, do nothing
}
}
else if ((count - 10) % 20 === 19) {
sleep(1);
}
// else if ((count - 10) % 5 === 4) {
// sleep(1);
// }
else {
sleep(0);
}
this._count = (count + 1) >>> 0;
this._count = spin(this._count);
}
}
/**
* Periodically puts the current thread to sleep in an effort to reduce lock contention.
* @param currentCount an unsigned 32-bit integer value used to determine the current spin count.
* @returns the new spin count.
* @example
* ```ts
* let spinCounter = 0;
* while (!trySomeLockFreeOperation()) {
* spinCounter = spin(spinCounter);
* }
* ```
*/
export function spin(currentCount: number) {
currentCount >>>= 0;
if (cpuCount > 0 && currentCount < 10) {
for (let i = 0; i < currentCount; i++) {
// busy loop, do nothing
}
}
else if ((currentCount - 10) % 20 === 19) {
sleep(2);
}
else if ((currentCount - 10) % 5 === 4) {
sleep(1);
}
else {
sleep(0);
}
// on uint32 overflow, reset to 10 so that we do not retry the busy loop
currentCount = (currentCount + 1) >>> 0;
if (currentCount === 0) {
currentCount = 10;
}
return currentCount;
}

View File

@ -1,65 +1,65 @@
import { ConcurrentMap, wrapStruct } from "../../_namespaces/ts";
import { ConcurrentMap, ConcurrentMapWrapper } from "../../_namespaces/ts";
describe("unittests:: sharing:: concurrentMap", () => {
it("round trip", () => {
const map = wrapStruct(new ConcurrentMap<number, string>(), ConcurrentMap);
assert.equal(map.size(), 0);
const map = new ConcurrentMapWrapper(new ConcurrentMap<number, string>());
assert.equal(map.size, 0);
assert.isFalse(map.has(1));
map.set(1, "a");
assert.equal(map.size(), 1);
assert.equal(map.size, 1);
assert.isTrue(map.has(1));
map.delete(1);
assert.equal(map.size(), 0);
assert.equal(map.size, 0);
assert.isFalse(map.has(1));
});
it("large inserts", () => {
const map = wrapStruct(new ConcurrentMap<number, number>(), ConcurrentMap);
const map = new ConcurrentMapWrapper(new ConcurrentMap<number, number>());
for (let i = 0; i < 10_000; i++) {
map.insert(i, i);
}
assert.equal(map.size(), 10_000);
assert.equal(map.size, 10_000);
});
it("large deletes", () => {
const map = wrapStruct(new ConcurrentMap<number, number>(), ConcurrentMap);
const map = new ConcurrentMapWrapper(new ConcurrentMap<number, number>());
for (let i = 0; i < 10_000; i++) {
map.insert(i, i);
}
for (let i = 0; i < 10_000; i++) {
map.delete(i);
}
assert.equal(map.size(), 0);
assert.equal(map.size, 0);
});
describe("exchange", () => {
it("sets key to value if missing", () => {
const map = wrapStruct(new ConcurrentMap<number, number>(), ConcurrentMap);
const map = new ConcurrentMapWrapper(new ConcurrentMap<number, number>());
map.exchange(1, 2);
assert.equal(map.size(), 1);
assert.equal(map.size, 1);
assert.isTrue(map.has(1));
assert.equal(map.get(1), 2);
});
it("overwrites key if present and value is not undefined", () => {
const map = wrapStruct(new ConcurrentMap<number, number>(), ConcurrentMap);
const map = new ConcurrentMapWrapper(new ConcurrentMap<number, number>());
map.set(1, 2);
map.exchange(1, 3);
assert.equal(map.size(), 1);
assert.equal(map.size, 1);
assert.isTrue(map.has(1));
assert.equal(map.get(1), 3);
});
it("deletes key if present and value is undefined", () => {
const map = wrapStruct(new ConcurrentMap<number, number>(), ConcurrentMap);
const map = new ConcurrentMapWrapper(new ConcurrentMap<number, number>());
map.set(1, 2);
map.exchange(1, /*value*/ undefined);
assert.equal(map.size(), 0);
assert.equal(map.size, 0);
assert.isFalse(map.has(1));
assert.equal(map.get(1), /*expected*/ undefined);
});
it("returns undefined if key was not present", () => {
const map = wrapStruct(new ConcurrentMap<number, number>(), ConcurrentMap);
const map = new ConcurrentMapWrapper(new ConcurrentMap<number, number>());
const result = map.exchange(1, 2);
assert.isUndefined(result);
});
it("returns previous value if key was present", () => {
const map = wrapStruct(new ConcurrentMap<number, number>(), ConcurrentMap);
const map = new ConcurrentMapWrapper(new ConcurrentMap<number, number>());
map.set(1, 2);
const result1 = map.exchange(1, 3);
const result2 = map.exchange(1, /*value*/ undefined);
@ -69,64 +69,64 @@ describe("unittests:: sharing:: concurrentMap", () => {
});
describe("compareExchange", () => {
it("sets key to value if expecting undefined and key not present", () => {
const map = wrapStruct(new ConcurrentMap<number, number>(), ConcurrentMap);
const map = new ConcurrentMapWrapper(new ConcurrentMap<number, number>());
map.compareExchange(1, /*expectedValue*/ undefined, 2);
assert.equal(map.size(), 1);
assert.equal(map.size, 1);
assert.isTrue(map.has(1));
assert.equal(map.get(1), 2);
});
it("does not set key to value if expecting undefined and key is present", () => {
const map = wrapStruct(new ConcurrentMap<number, number>(), ConcurrentMap);
const map = new ConcurrentMapWrapper(new ConcurrentMap<number, number>());
map.set(1, 2);
map.compareExchange(1, /*expectedValue*/ undefined, 3);
assert.equal(map.size(), 1);
assert.equal(map.size, 1);
assert.isTrue(map.has(1));
assert.equal(map.get(1), 2);
});
it("overwrites key if present, expected value matches, and replacement value is not undefined", () => {
const map = wrapStruct(new ConcurrentMap<number, number>(), ConcurrentMap);
const map = new ConcurrentMapWrapper(new ConcurrentMap<number, number>());
map.set(1, 2);
map.compareExchange(1, 2, 3);
assert.equal(map.size(), 1);
assert.equal(map.size, 1);
assert.isTrue(map.has(1));
assert.equal(map.get(1), 3);
});
it("does not overwrite key if present, expected value does not match, and replacement value is not undefined", () => {
const map = wrapStruct(new ConcurrentMap<number, number>(), ConcurrentMap);
const map = new ConcurrentMapWrapper(new ConcurrentMap<number, number>());
map.set(1, 2);
map.compareExchange(1, 4, 3);
assert.equal(map.size(), 1);
assert.equal(map.size, 1);
assert.isTrue(map.has(1));
assert.equal(map.get(1), 2);
});
it("deletes key if present, expected value matches, and value is undefined", () => {
const map = wrapStruct(new ConcurrentMap<number, number>(), ConcurrentMap);
const map = new ConcurrentMapWrapper(new ConcurrentMap<number, number>());
map.set(1, 2);
map.compareExchange(1, 2, /*replacementValue*/ undefined);
assert.equal(map.size(), 0);
assert.equal(map.size, 0);
assert.isFalse(map.has(1));
assert.equal(map.get(1), /*expected*/ undefined);
});
it("does not delete key if present, expected value does not match, and value is undefined", () => {
const map = wrapStruct(new ConcurrentMap<number, number>(), ConcurrentMap);
const map = new ConcurrentMapWrapper(new ConcurrentMap<number, number>());
map.set(1, 2);
map.compareExchange(1, 3, /*replacementValue*/ undefined);
assert.equal(map.size(), 1);
assert.equal(map.size, 1);
assert.isTrue(map.has(1));
assert.equal(map.get(1), 2);
});
it("returns undefined if key was not present and expected value is undefined", () => {
const map = wrapStruct(new ConcurrentMap<number, number>(), ConcurrentMap);
const map = new ConcurrentMapWrapper(new ConcurrentMap<number, number>());
const result = map.compareExchange(1, /*expectedValue*/ undefined, 2);
assert.isUndefined(result);
});
it("returns undefined if key was not present and expected value was not undefined", () => {
const map = wrapStruct(new ConcurrentMap<number, number>(), ConcurrentMap);
const map = new ConcurrentMapWrapper(new ConcurrentMap<number, number>());
const result = map.compareExchange(1, 2, 3);
assert.isUndefined(result);
});
it("returns previous value if key was present and expected value matches", () => {
const map = wrapStruct(new ConcurrentMap<number, number>(), ConcurrentMap);
const map = new ConcurrentMapWrapper(new ConcurrentMap<number, number>());
map.set(1, 2);
const result1 = map.compareExchange(1, 2, 3);
const result2 = map.compareExchange(1, 3, /*value*/ undefined);
@ -134,7 +134,7 @@ describe("unittests:: sharing:: concurrentMap", () => {
assert.equal(result2, 3);
});
it("returns previous value if key was present and expected value did not match", () => {
const map = wrapStruct(new ConcurrentMap<number, number>(), ConcurrentMap);
const map = new ConcurrentMapWrapper(new ConcurrentMap<number, number>());
map.set(1, 2);
const result1 = map.compareExchange(1, 3, 4);
const result2 = map.compareExchange(1, 4, /*value*/ undefined);