/**
|
* @license
|
* Copyright 2018 Google LLC. All Rights Reserved.
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
* you may not use this file except in compliance with the License.
|
* You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*
|
* =============================================================================
|
*/
|
import * as tf from '@tensorflow/tfjs-core';
|
import * as seedrandom from 'seedrandom';
|
import { deepClone } from '../util/deep_clone';
|
import { deepMapAndAwaitAll, deepZip, zipToList } from '../util/deep_map';
|
import { GrowingRingBuffer } from '../util/growing_ring_buffer';
|
import { RingBuffer } from '../util/ring_buffer';
|
// Here we implement a simple asynchronous iterator.
|
// This lets us avoid using either third-party stream libraries or
|
// recent TypeScript language support requiring polyfills.
|
/**
|
* Create a `LazyIterator` from an array of items.
|
*/
|
export function iteratorFromItems(items) {
|
return new ArrayIterator(items);
|
}
|
/**
|
* Create a `LazyIterator` of incrementing integers.
|
*/
|
export function iteratorFromIncrementing(start) {
|
let i = start;
|
return iteratorFromFunction(() => ({ value: i++, done: false }));
|
}
|
/**
|
* Create a `LazyIterator` from a function.
|
*
|
* ```js
|
* let i = -1;
|
* const func = () =>
|
* ++i < 5 ? {value: i, done: false} : {value: null, done: true};
|
* const iter = tf.data.iteratorFromFunction(func);
|
* await iter.forEachAsync(e => console.log(e));
|
* ```
|
*
|
* @param func A function that produces data on each call.
|
*/
|
export function iteratorFromFunction(func) {
|
return new FunctionCallIterator(func);
|
}
|
/**
|
* Create a `LazyIterator` by concatenating underlying streams, which are
|
* themselves provided as a stream.
|
*
|
* This can also be thought of as a "stream flatten" operation.
|
*
|
* @param baseIterators A stream of streams to be concatenated.
|
* @param baseErrorHandler An optional function that can intercept `Error`s
|
* raised during a `next()` call on the base stream. This function can decide
|
* whether the error should be propagated, whether the error should be
|
* ignored, or whether the base stream should be terminated.
|
*/
|
export function iteratorFromConcatenated(baseIterators, baseErrorHandler) {
|
return new ChainedIterator(baseIterators, baseErrorHandler);
|
}
|
/**
|
* Create a `LazyIterator` by concatenating streams produced by calling a
|
* stream-generating function a given number of times.
|
*
|
* Since a `LazyIterator` is read-once, it cannot be repeated, but this
|
* function can be used to achieve a similar effect:
|
*
|
* LazyIterator.ofConcatenatedFunction(() => new MyIterator(), 6);
|
*
|
* @param iteratorFunc: A function that produces a new stream on each call.
|
* @param count: The number of times to call the function.
|
* @param baseErrorHandler An optional function that can intercept `Error`s
|
* raised during a `next()` call on the base stream. This function can decide
|
* whether the error should be propagated, whether the error should be
|
* ignored, or whether the base stream should be terminated.
|
*/
|
export function iteratorFromConcatenatedFunction(iteratorFunc, count, baseErrorHandler) {
|
return iteratorFromConcatenated(iteratorFromFunction(iteratorFunc).take(count), baseErrorHandler);
|
}
|
/**
|
* Create a `LazyIterator` by zipping together an array, dict, or nested
|
* structure of `LazyIterator`s (and perhaps additional constants).
|
*
|
* The underlying streams must provide elements in a consistent order such
|
* that they correspond.
|
*
|
* Typically, the underlying streams should have the same number of
|
* elements. If they do not, the behavior is determined by the
|
* `mismatchMode` argument.
|
*
|
* The nested structure of the `iterators` argument determines the
|
* structure of elements in the resulting iterator.
|
*
|
* @param iterators: An array or object containing LazyIterators at the
|
* leaves.
|
* @param mismatchMode: Determines what to do when one underlying iterator
|
* is exhausted before the others. `ZipMismatchMode.FAIL` (the default)
|
* causes an error to be thrown in this case. `ZipMismatchMode.SHORTEST`
|
* causes the zipped iterator to terminate with the furst underlying
|
* streams, so elements remaining on the longer streams are ignored.
|
* `ZipMismatchMode.LONGEST` causes the zipped stream to continue, filling
|
* in nulls for the exhausted streams, until all streams are exhausted.
|
*/
|
export function iteratorFromZipped(iterators, mismatchMode = ZipMismatchMode.FAIL) {
|
return new ZipIterator(iterators, mismatchMode);
|
}
|
/**
|
* An asynchronous iterator, providing lazy access to a potentially
|
* unbounded stream of elements.
|
*
|
* Iterator can be obtained from a dataset:
|
* `const iter = await dataset.iterator();`
|
*/
|
export class LazyIterator {
|
/**
|
* Collect all remaining elements of a bounded stream into an array.
|
* Obviously this will succeed only for small streams that fit in memory.
|
* Useful for testing.
|
*
|
* @returns A Promise for an array of stream elements, which will resolve
|
* when the stream is exhausted.
|
*/
|
async toArray() {
|
const result = [];
|
let x = await this.next();
|
while (!x.done) {
|
result.push(x.value);
|
x = await this.next();
|
}
|
return result;
|
}
|
/**
|
* Collect all elements of this dataset into an array with prefetching 100
|
* elements. This is useful for testing, because the prefetch changes the
|
* order in which the Promises are resolved along the processing pipeline.
|
* This may help expose bugs where results are dependent on the order of
|
* Promise resolution rather than on the logical order of the stream (i.e.,
|
* due to hidden mutable state).
|
*
|
* @returns A Promise for an array of stream elements, which will resolve
|
* when the stream is exhausted.
|
*/
|
async toArrayForTest() {
|
const stream = this.prefetch(100);
|
const result = [];
|
let x = await stream.next();
|
while (!x.done) {
|
result.push(x.value);
|
x = await stream.next();
|
}
|
return result;
|
}
|
/**
|
* Draw items from the stream until it is exhausted.
|
*
|
* This can be useful when the stream has side effects but no output. In
|
* that case, calling this function guarantees that the stream will be
|
* fully processed.
|
*/
|
async resolveFully() {
|
let x = await this.next();
|
while (!x.done) {
|
x = await this.next();
|
}
|
}
|
/**
|
* Draw items from the stream until it is exhausted, or a predicate fails.
|
*
|
* This can be useful when the stream has side effects but no output. In
|
* that case, calling this function guarantees that the stream will be
|
* fully processed.
|
*/
|
async resolveWhile(predicate) {
|
let x = await this.next();
|
let shouldContinue = predicate(x.value);
|
while ((!x.done) && shouldContinue) {
|
x = await this.next();
|
shouldContinue = predicate(x.value);
|
}
|
}
|
/**
|
* Handles errors thrown on this stream using a provided handler function.
|
*
|
* @param handler A function that handles any `Error` thrown during a `next()`
|
* call and returns true if the stream should continue (dropping the failed
|
* call) or false if the stream should quietly terminate. If the handler
|
* itself throws (or rethrows) an `Error`, that will be propagated.
|
*
|
* @returns A `LazyIterator` of elements passed through from upstream,
|
* possibly filtering or terminating on upstream `next()` calls that
|
* throw an `Error`.
|
*/
|
handleErrors(handler) {
|
return new ErrorHandlingLazyIterator(this, handler);
|
}
|
// TODO(soergel): Implement reduce() etc.
|
/**
|
* Filters this stream according to `predicate`.
|
*
|
* @param predicate A function mapping a stream element to a boolean or a
|
* `Promise` for one.
|
*
|
* @returns A `LazyIterator` of elements for which the predicate was true.
|
*/
|
filter(predicate) {
|
return new FilterIterator(this, predicate);
|
}
|
/**
|
* Maps this stream through a 1-to-1 transform.
|
*
|
* @param transform A function mapping a stream element to a transformed
|
* element.
|
*
|
* @returns A `LazyIterator` of transformed elements.
|
*/
|
map(transform) {
|
return new MapIterator(this, transform);
|
}
|
/**
|
* Maps this stream through an async 1-to-1 transform.
|
*
|
* @param transform A function mapping a stream element to a `Promise` for a
|
* transformed stream element.
|
*
|
* @returns A `LazyIterator` of transformed elements.
|
*/
|
mapAsync(transform) {
|
return new AsyncMapIterator(this, transform);
|
}
|
/**
|
* Maps this stream through a 1-to-1 transform, forcing serial execution.
|
*
|
* @param transform A function mapping a stream element to a transformed
|
* element.
|
*
|
* @returns A `LazyIterator` of transformed elements.
|
*/
|
serialMapAsync(transform) {
|
return new AsyncMapIterator(this, transform).serial();
|
}
|
/**
|
* Maps this stream through a 1-to-many transform.
|
*
|
* @param transform A function mapping a stream element to an array of
|
* transformed elements.
|
*
|
* @returns A `DataStream` of transformed elements.
|
*/
|
flatmap(transform) {
|
return new FlatmapIterator(this, transform);
|
}
|
/**
|
* Apply a function to every element of the stream.
|
*
|
* @param f A function to apply to each stream element.
|
*/
|
async forEachAsync(f) {
|
return this.map(f).resolveFully();
|
}
|
/**
|
* Apply a function to every element of the stream, forcing serial execution.
|
*
|
* @param f A function to apply to each stream element. Should return 'true'
|
* to indicate that the stream should continue, or 'false' to cause it to
|
* terminate.
|
*/
|
async serialForEach(f) {
|
return this.serialMapAsync(f).resolveWhile(x => (x === true));
|
}
|
/**
|
* Groups elements into batches, represented as arrays of elements.
|
*
|
* We can think of the elements of this iterator as 'rows' (even if they are
|
* nested structures). By the same token, consecutive values for a given
|
* key within the elements form a 'column'. This matches the usual sense of
|
* 'row' and 'column' when processing tabular data (e.g., parsing a CSV).
|
*
|
* Thus, "Row-major" means that the resulting batch is simply a collection of
|
* rows: `[row1, row2, row3, ...]`. This is contrast to the column-major
|
* form, which is needed for vectorized computation.
|
*
|
* @param batchSize The number of elements desired per batch.
|
* @param smallLastBatch Whether to emit the final batch when it has fewer
|
* than batchSize elements. Default true.
|
* @returns A `LazyIterator` of batches of elements, represented as arrays
|
* of the original element type.
|
*/
|
rowMajorBatch(batchSize, smallLastBatch = true) {
|
return new RowMajorBatchIterator(this, batchSize, smallLastBatch);
|
}
|
/**
|
* Groups elements into batches, represented in column-major form.
|
*
|
* We can think of the elements of this iterator as 'rows' (even if they are
|
* nested structures). By the same token, consecutive values for a given
|
* key within the elements form a 'column'. This matches the usual sense of
|
* 'row' and 'column' when processing tabular data (e.g., parsing a CSV).
|
*
|
* Thus, "column-major" means that the resulting batch is a (potentially
|
* nested) structure representing the columns. Each column entry, then,
|
* contains a collection of the values found in that column for a range of
|
* input elements. This representation allows for vectorized computation, in
|
* contrast to the row-major form.
|
*
|
* The inputs should all have the same nested structure (i.e., of arrays and
|
* dicts). The result is a single object with the same nested structure,
|
* where the leaves are arrays collecting the values of the inputs at that
|
* location (or, optionally, the result of a custom function applied to those
|
* arrays).
|
*
|
* @param batchSize The number of elements desired per batch.
|
* @param smallLastBatch Whether to emit the final batch when it has fewer
|
* than batchSize elements. Default true.
|
* @param zipFn: (optional) A function that expects an array of elements at a
|
* single node of the object tree, and returns a `DeepMapResult`. The
|
* `DeepMapResult` either provides a result value for that node (i.e.,
|
* representing the subtree), or indicates that the node should be processed
|
* recursively. The default zipFn recurses as far as possible and places
|
* arrays at the leaves.
|
* @returns A `LazyIterator` of batches of elements, represented as an object
|
* with collections at the leaves.
|
*/
|
columnMajorBatch(batchSize, smallLastBatch = true,
|
// tslint:disable-next-line:no-any
|
zipFn = zipToList) {
|
// First collect the desired number of input elements as a row-major batch.
|
const rowBatches = this.rowMajorBatch(batchSize, smallLastBatch);
|
// Now 'rotate' or 'pivot' the data, collecting all values from each column
|
// in the batch (i.e., for each key within the elements) into an array.
|
return rowBatches.map(x => deepZip(x, zipFn));
|
}
|
/**
|
* Concatenate this `LazyIterator` with another.
|
*
|
* @param iterator A `LazyIterator` to be concatenated onto this one.
|
* @param baseErrorHandler An optional function that can intercept `Error`s
|
* raised during a `next()` call on the base stream. This function can
|
* decide whether the error should be propagated, whether the error should
|
* be ignored, or whether the base stream should be terminated.
|
* @returns A `LazyIterator`.
|
*/
|
concatenate(iterator, baseErrorHandler) {
|
return new ChainedIterator(iteratorFromItems([this, iterator]), baseErrorHandler);
|
}
|
/**
|
* Limits this stream to return at most `count` items.
|
*
|
* @param count The maximum number of items to provide from the stream. If
|
* a negative or undefined value is given, the entire stream is returned
|
* unaltered.
|
*/
|
take(count) {
|
if (count < 0 || count == null) {
|
return this;
|
}
|
return new TakeIterator(this, count);
|
}
|
/**
|
* Skips the first `count` items in this stream.
|
*
|
* @param count The number of items to skip. If a negative or undefined
|
* value is given, the entire stream is returned unaltered.
|
*/
|
skip(count) {
|
if (count < 0 || count == null) {
|
return this;
|
}
|
return new SkipIterator(this, count);
|
}
|
/**
|
* Prefetch the first `bufferSize` items in this stream.
|
*
|
* Note this prefetches Promises, but makes no guarantees about when those
|
* Promises resolve.
|
*
|
* @param bufferSize: An integer specifying the number of elements to be
|
* prefetched.
|
*/
|
prefetch(bufferSize) {
|
return new PrefetchIterator(this, bufferSize);
|
}
|
// TODO(soergel): deep sharded shuffle, where supported
|
/**
|
* Randomly shuffles the elements of this stream.
|
*
|
* @param bufferSize: An integer specifying the number of elements from
|
* this stream from which the new stream will sample.
|
* @param seed: (Optional.) An integer specifying the random seed that
|
* will be used to create the distribution.
|
*/
|
shuffle(windowSize, seed) {
|
return new ShuffleIterator(this, windowSize, seed);
|
}
|
/**
|
* Force an iterator to execute serially: each next() call will await the
|
* prior one, so that they cannot execute concurrently.
|
*/
|
serial() {
|
return new SerialIterator(this);
|
}
|
}
|
// ============================================================================
|
// The following private classes serve to implement the chainable methods
|
// on LazyIterator. Unfortunately they can't be placed in separate files,
|
// due to resulting trouble with circular imports.
|
// ============================================================================
|
// Iterators that just extend LazyIterator directly
|
// ============================================================================
|
class ArrayIterator extends LazyIterator {
|
constructor(items) {
|
super();
|
this.items = items;
|
this.trav = 0;
|
}
|
summary() {
|
return `Array of ${this.items.length} items`;
|
}
|
async next() {
|
if (this.trav >= this.items.length) {
|
return { value: null, done: true };
|
}
|
const item = this.items[this.trav];
|
this.trav++;
|
return { value: deepClone(item), done: false };
|
}
|
}
|
class FunctionCallIterator extends LazyIterator {
|
constructor(nextFn) {
|
super();
|
this.nextFn = nextFn;
|
}
|
summary() {
|
return `Function call`;
|
}
|
async next() {
|
try {
|
return this.nextFn();
|
}
|
catch (e) {
|
// Modify the error message but leave the stack trace intact
|
e.message =
|
`Error thrown while iterating through a dataset: ${e.message}`;
|
throw e;
|
}
|
}
|
}
|
class SerialIterator extends LazyIterator {
|
constructor(upstream) {
|
super();
|
this.upstream = upstream;
|
this.lastRead = Promise.resolve({ value: null, done: false });
|
}
|
summary() {
|
return `${this.upstream.summary()} -> Serial`;
|
}
|
async next() {
|
// This sets this.lastRead to a new Promise right away, as opposed to
|
// saying `await this.lastRead; this.lastRead = this.serialNext();` which
|
// would not work because this.nextRead would be updated only after the
|
// promise resolves.
|
this.lastRead = this.lastRead.then(() => this.serialNext());
|
return this.lastRead;
|
}
|
async serialNext() {
|
return this.upstream.next();
|
}
|
}
|
class SkipIterator extends LazyIterator {
|
constructor(upstream, maxCount) {
|
super();
|
this.upstream = upstream;
|
this.maxCount = maxCount;
|
// Local state that should not be clobbered by out-of-order execution.
|
this.count = 0;
|
this.lastRead = Promise.resolve({ value: null, done: false });
|
}
|
summary() {
|
return `${this.upstream.summary()} -> Skip`;
|
}
|
async next() {
|
// This sets this.lastRead to a new Promise right away, as opposed to
|
// saying `await this.lastRead; this.lastRead = this.serialNext();` which
|
// would not work because this.nextRead would be updated only after the
|
// promise resolves.
|
this.lastRead = this.lastRead.then(() => this.serialNext());
|
return this.lastRead;
|
}
|
async serialNext() {
|
// TODO(soergel): consider tradeoffs of reading in parallel, eg.
|
// collecting next() promises in an Array and then waiting for
|
// Promise.all() of those. Benefit: pseudo-parallel execution. Drawback:
|
// maybe delayed GC.
|
while (this.count++ < this.maxCount) {
|
const skipped = await this.upstream.next();
|
// short-circuit if upstream is already empty
|
if (skipped.done) {
|
return skipped;
|
}
|
tf.dispose(skipped.value);
|
}
|
return this.upstream.next();
|
}
|
}
|
class TakeIterator extends LazyIterator {
|
constructor(upstream, maxCount) {
|
super();
|
this.upstream = upstream;
|
this.maxCount = maxCount;
|
this.count = 0;
|
}
|
summary() {
|
return `${this.upstream.summary()} -> Take`;
|
}
|
async next() {
|
if (this.count++ >= this.maxCount) {
|
return { value: null, done: true };
|
}
|
return this.upstream.next();
|
}
|
}
|
// Note this batch just groups items into row-wise element arrays.
|
// Rotating these to a column-wise representation happens only at the dataset
|
// level.
|
class RowMajorBatchIterator extends LazyIterator {
|
constructor(upstream, batchSize, enableSmallLastBatch = true) {
|
super();
|
this.upstream = upstream;
|
this.batchSize = batchSize;
|
this.enableSmallLastBatch = enableSmallLastBatch;
|
this.lastRead = Promise.resolve({ value: null, done: false });
|
}
|
summary() {
|
return `${this.upstream.summary()} -> RowMajorBatch`;
|
}
|
async next() {
|
// This sets this.lastRead to a new Promise right away, as opposed to
|
// saying `await this.lastRead; this.lastRead = this.serialNext();` which
|
// would not work because this.nextRead would be updated only after the
|
// promise resolves.
|
this.lastRead = this.lastRead.then(() => this.serialNext());
|
return this.lastRead;
|
}
|
async serialNext() {
|
const batch = [];
|
while (batch.length < this.batchSize) {
|
const item = await this.upstream.next();
|
if (item.done) {
|
if (this.enableSmallLastBatch && batch.length > 0) {
|
return { value: batch, done: false };
|
}
|
return { value: null, done: true };
|
}
|
batch.push(item.value);
|
}
|
return { value: batch, done: false };
|
}
|
}
|
class FilterIterator extends LazyIterator {
|
constructor(upstream, predicate) {
|
super();
|
this.upstream = upstream;
|
this.predicate = predicate;
|
this.lastRead = Promise.resolve({ value: null, done: false });
|
}
|
summary() {
|
return `${this.upstream.summary()} -> Filter`;
|
}
|
async next() {
|
// This sets this.lastRead to a new Promise right away, as opposed to
|
// saying `await this.lastRead; this.lastRead = this.serialNext();` which
|
// would not work because this.nextRead would be updated only after the
|
// promise resolves.
|
this.lastRead = this.lastRead.then(() => this.serialNext());
|
return this.lastRead;
|
}
|
async serialNext() {
|
while (true) {
|
const item = await this.upstream.next();
|
if (item.done || this.predicate(item.value)) {
|
return item;
|
}
|
tf.dispose(item.value);
|
}
|
}
|
}
|
class MapIterator extends LazyIterator {
|
constructor(upstream, transform) {
|
super();
|
this.upstream = upstream;
|
this.transform = transform;
|
}
|
summary() {
|
return `${this.upstream.summary()} -> Map`;
|
}
|
async next() {
|
const item = await this.upstream.next();
|
if (item.done) {
|
return { value: null, done: true };
|
}
|
const inputTensors = tf.tensor_util.getTensorsInContainer(item.value);
|
// Careful: the transform may mutate the item in place.
|
// That's why we have to remember the input Tensors above, and then
|
// below dispose only those that were not passed through to the output.
|
// Note too that the transform function is responsible for tidying
|
// any intermediate Tensors. Here we are concerned only about the
|
// inputs.
|
const mapped = this.transform(item.value);
|
const outputTensors = tf.tensor_util.getTensorsInContainer(mapped);
|
// TODO(soergel) faster intersection
|
// TODO(soergel) move to tf.disposeExcept(in, out)?
|
for (const t of inputTensors) {
|
if (!tf.tensor_util.isTensorInList(t, outputTensors)) {
|
t.dispose();
|
}
|
}
|
return { value: mapped, done: false };
|
}
|
}
|
class ErrorHandlingLazyIterator extends LazyIterator {
|
constructor(upstream, handler) {
|
super();
|
this.upstream = upstream;
|
this.handler = handler;
|
this.count = 0;
|
this.lastRead = Promise.resolve({ value: null, done: false });
|
}
|
summary() {
|
return `${this.upstream.summary()} -> handleErrors`;
|
}
|
async next() {
|
// This sets this.lastRead to a new Promise right away, as opposed to
|
// saying `await this.lastRead; this.lastRead = this.serialNext();` which
|
// would not work because this.nextRead would be updated only after the
|
// promise resolves.
|
this.lastRead = this.lastRead.then(() => this.serialNext());
|
return this.lastRead;
|
}
|
async serialNext() {
|
while (true) {
|
try {
|
return await this.upstream.next();
|
}
|
catch (e) {
|
if (!this.handler(e)) {
|
return { value: null, done: true };
|
}
|
// If the handler returns true, loop and fetch the next upstream item.
|
// If the upstream iterator throws an endless stream of errors, and if
|
// the handler says to ignore them, then we loop forever here. That is
|
// the correct behavior-- it's up to the handler to decide when to stop.
|
}
|
}
|
}
|
}
|
class AsyncMapIterator extends LazyIterator {
|
constructor(upstream, transform) {
|
super();
|
this.upstream = upstream;
|
this.transform = transform;
|
}
|
summary() {
|
return `${this.upstream.summary()} -> AsyncMap`;
|
}
|
async next() {
|
const item = await this.upstream.next();
|
if (item.done) {
|
return { value: null, done: true };
|
}
|
const inputTensors = tf.tensor_util.getTensorsInContainer(item.value);
|
// Careful: the transform may mutate the item in place.
|
// That's why we have to remember the input Tensors above, and then
|
// below dispose only those that were not passed through to the output.
|
// Note too that the transform function is responsible for tidying
|
// any intermediate Tensors. Here we are concerned only about the
|
// inputs.
|
const mapped = await this.transform(item.value);
|
const outputTensors = tf.tensor_util.getTensorsInContainer(mapped);
|
// TODO(soergel) faster intersection
|
// TODO(soergel) move to tf.disposeExcept(in, out)?
|
for (const t of inputTensors) {
|
if (!tf.tensor_util.isTensorInList(t, outputTensors)) {
|
t.dispose();
|
}
|
}
|
return { value: mapped, done: false };
|
}
|
}
|
// Iterators that maintain a queue of pending items
|
// ============================================================================
|
/**
|
* A base class for transforming streams that operate by maintaining an
|
* output queue of elements that are ready to return via next(). This is
|
* commonly required when the transformation is 1-to-many: A call to next()
|
* may trigger a call to the underlying stream, which will produce many
|
* mapped elements of this stream-- of which we need to return only one, so
|
* we have to queue the rest.
|
*/
|
export class OneToManyIterator extends LazyIterator {
|
constructor() {
|
super();
|
this.outputQueue = new GrowingRingBuffer();
|
this.lastRead = Promise.resolve({ value: null, done: false });
|
}
|
async next() {
|
// This sets this.lastRead to a new Promise right away, as opposed to
|
// saying `await this.lastRead; this.lastRead = this.serialNext();` which
|
// would not work because this.nextRead would be updated only after the
|
// promise resolves.
|
this.lastRead = this.lastRead.then(() => this.serialNext());
|
return this.lastRead;
|
}
|
async serialNext() {
|
// Fetch so that the queue contains at least one item if possible.
|
// If the upstream source is exhausted, AND there are no items left in
|
// the output queue, then this stream is also exhausted.
|
while (this.outputQueue.length() === 0) {
|
// TODO(soergel): consider parallel reads.
|
if (!await this.pump()) {
|
return { value: null, done: true };
|
}
|
}
|
return { value: this.outputQueue.shift(), done: false };
|
}
|
}
|
class FlatmapIterator extends OneToManyIterator {
|
constructor(upstream, transform) {
|
super();
|
this.upstream = upstream;
|
this.transform = transform;
|
}
|
summary() {
|
return `${this.upstream.summary()} -> Flatmap`;
|
}
|
async pump() {
|
const item = await this.upstream.next();
|
if (item.done) {
|
return false;
|
}
|
const inputTensors = tf.tensor_util.getTensorsInContainer(item.value);
|
// Careful: the transform may mutate the item in place.
|
// that's why we have to remember the input Tensors above, and then
|
// below dispose only those that were not passed through to the output.
|
// Note too that the transform function is responsible for tidying any
|
// intermediate Tensors. Here we are concerned only about the inputs.
|
const mappedArray = this.transform(item.value);
|
const outputTensors = tf.tensor_util.getTensorsInContainer(mappedArray);
|
this.outputQueue.pushAll(mappedArray);
|
// TODO(soergel) faster intersection, and deduplicate outputTensors
|
// TODO(soergel) move to tf.disposeExcept(in, out)?
|
for (const t of inputTensors) {
|
if (!tf.tensor_util.isTensorInList(t, outputTensors)) {
|
t.dispose();
|
}
|
}
|
return true;
|
}
|
}
|
/**
|
* Provides a `LazyIterator` that concatenates a stream of underlying
|
* streams.
|
*
|
* Doing this in a concurrency-safe way requires some trickery. In
|
* particular, we want this stream to return the elements from the
|
* underlying streams in the correct order according to when next() was
|
* called, even if the resulting Promises resolve in a different order.
|
*/
|
export class ChainedIterator extends LazyIterator {
|
constructor(iterators, baseErrorHandler) {
|
super();
|
this.baseErrorHandler = baseErrorHandler;
|
// Strict Promise execution order:
|
// a next() call may not even begin until the previous one completes.
|
this.lastRead = null;
|
// Local state that should not be clobbered by out-of-order execution.
|
this.iterator = null;
|
this.moreIterators = iterators;
|
}
|
summary() {
|
const upstreamSummaries = 'TODO: fill in upstream of chained summaries';
|
return `${upstreamSummaries} -> Chained`;
|
}
|
async next() {
|
this.lastRead = this.readFromChain(this.lastRead);
|
return this.lastRead;
|
}
|
async readFromChain(lastRead) {
|
// Must await on the previous read since the previous read may have advanced
|
// the stream of streams, from which we need to read.
|
// This is unfortunate since we can't parallelize reads. Which means
|
// prefetching of chained streams is a no-op.
|
// One solution is to prefetch immediately upstream of this.
|
await lastRead;
|
if (this.iterator == null) {
|
const iteratorResult = await this.moreIterators.next();
|
if (iteratorResult.done) {
|
// No more streams to stream from.
|
return { value: null, done: true };
|
}
|
this.iterator = iteratorResult.value;
|
if (this.baseErrorHandler != null) {
|
this.iterator = this.iterator.handleErrors(this.baseErrorHandler);
|
}
|
}
|
const itemResult = await this.iterator.next();
|
if (itemResult.done) {
|
this.iterator = null;
|
return this.readFromChain(lastRead);
|
}
|
return itemResult;
|
}
|
}
|
export var ZipMismatchMode;
|
(function (ZipMismatchMode) {
|
ZipMismatchMode[ZipMismatchMode["FAIL"] = 0] = "FAIL";
|
ZipMismatchMode[ZipMismatchMode["SHORTEST"] = 1] = "SHORTEST";
|
ZipMismatchMode[ZipMismatchMode["LONGEST"] = 2] = "LONGEST"; // use nulls for exhausted streams; use up the longest stream.
|
})(ZipMismatchMode || (ZipMismatchMode = {}));
|
/**
|
* Provides a `LazyIterator` that zips together an array, dict, or nested
|
* structure of `LazyIterator`s (and perhaps additional constants).
|
*
|
* The underlying streams must provide elements in a consistent order such
|
* that they correspond.
|
*
|
* Typically, the underlying streams should have the same number of
|
* elements. If they do not, the behavior is determined by the
|
* `mismatchMode` argument.
|
*
|
* The nested structure of the `iterators` argument determines the
|
* structure of elements in the resulting iterator.
|
*
|
* Doing this in a concurrency-safe way requires some trickery. In
|
* particular, we want this stream to return the elements from the
|
* underlying streams in the correct order according to when next() was
|
* called, even if the resulting Promises resolve in a different order.
|
*
|
* @param iterators: An array or object containing LazyIterators at the
|
* leaves.
|
* @param mismatchMode: Determines what to do when one underlying iterator
|
* is exhausted before the others. `ZipMismatchMode.FAIL` (the default)
|
* causes an error to be thrown in this case. `ZipMismatchMode.SHORTEST`
|
* causes the zipped iterator to terminate with the furst underlying
|
* streams, so elements remaining on the longer streams are ignored.
|
* `ZipMismatchMode.LONGEST` causes the zipped stream to continue, filling
|
* in nulls for the exhausted streams, until all streams are exhausted.
|
*/
|
class ZipIterator extends LazyIterator {
|
constructor(iterators, mismatchMode = ZipMismatchMode.FAIL) {
|
super();
|
this.iterators = iterators;
|
this.mismatchMode = mismatchMode;
|
this.count = 0;
|
this.currentPromise = null;
|
}
|
summary() {
|
const upstreamSummaries = 'TODO: fill in upstream of zip summaries';
|
return `{${upstreamSummaries}} -> Zip`;
|
}
|
async nextState(afterState) {
|
// This chaining ensures that the underlying next() are not even called
|
// before the previous ones have resolved.
|
await afterState;
|
// Collect underlying iterator "done" signals as a side effect in
|
// getNext()
|
let numIterators = 0;
|
let iteratorsDone = 0;
|
function getNext(container) {
|
if (container instanceof LazyIterator) {
|
const result = container.next();
|
return {
|
value: result.then(x => {
|
numIterators++;
|
if (x.done) {
|
iteratorsDone++;
|
}
|
return x.value;
|
}),
|
recurse: false
|
};
|
}
|
else {
|
return { value: null, recurse: true };
|
}
|
}
|
const mapped = await deepMapAndAwaitAll(this.iterators, getNext);
|
if (numIterators === iteratorsDone) {
|
// The streams have all ended.
|
return { value: null, done: true };
|
}
|
if (iteratorsDone > 0) {
|
switch (this.mismatchMode) {
|
case ZipMismatchMode.FAIL:
|
throw new Error('Zipped streams should have the same length. ' +
|
`Mismatched at element ${this.count}.`);
|
case ZipMismatchMode.SHORTEST:
|
return { value: null, done: true };
|
case ZipMismatchMode.LONGEST:
|
default:
|
// Continue. The exhausted streams already produced value: null.
|
}
|
}
|
this.count++;
|
return { value: mapped, done: false };
|
}
|
async next() {
|
this.currentPromise = this.nextState(this.currentPromise);
|
return this.currentPromise;
|
}
|
}
|
// Iterators that maintain a ring buffer of pending promises
|
// ============================================================================
|
/**
|
* A stream that prefetches a given number of items from an upstream source,
|
* returning them in FIFO order.
|
*
|
* Note this prefetches Promises, but makes no guarantees about when those
|
* Promises resolve.
|
*/
|
export class PrefetchIterator extends LazyIterator {
|
constructor(upstream, bufferSize) {
|
super();
|
this.upstream = upstream;
|
this.bufferSize = bufferSize;
|
this.buffer = new RingBuffer(bufferSize);
|
}
|
summary() {
|
return `${this.upstream.summary()} -> Prefetch`;
|
}
|
/**
|
* Refill the prefetch buffer. Returns only after the buffer is full, or
|
* the upstream source is exhausted.
|
*/
|
refill() {
|
while (!this.buffer.isFull()) {
|
const v = this.upstream.next();
|
this.buffer.push(v);
|
}
|
}
|
next() {
|
this.refill();
|
// This shift will never throw an error because the buffer is always
|
// full after a refill. If the stream is exhausted, the buffer will be
|
// full of Promises that will resolve to the end-of-stream signal.
|
return this.buffer.shift();
|
}
|
}
|
/**
|
* A stream that performs a sliding-window random shuffle on an upstream
|
* source. This is like a `PrefetchIterator` except that the items are
|
* returned in randomized order. Mixing naturally improves as the buffer
|
* size increases.
|
*/
|
export class ShuffleIterator extends PrefetchIterator {
|
constructor(upstream, windowSize, seed) {
|
super(upstream, windowSize);
|
this.upstream = upstream;
|
this.windowSize = windowSize;
|
// Local state that should not be clobbered by out-of-order execution.
|
this.upstreamExhausted = false;
|
this.random = seedrandom.alea(seed || tf.util.now().toString());
|
this.lastRead = Promise.resolve({ value: null, done: false });
|
}
|
async next() {
|
// This sets this.lastRead to a new Promise right away, as opposed to
|
// saying `await this.lastRead; this.lastRead = this.serialNext();` which
|
// would not work because this.nextRead would be updated only after the
|
// promise resolves.
|
this.lastRead = this.lastRead.then(() => this.serialNext());
|
return this.lastRead;
|
}
|
randomInt(max) {
|
return Math.floor(this.random() * max);
|
}
|
chooseIndex() {
|
return this.randomInt(this.buffer.length());
|
}
|
async serialNext() {
|
// TODO(soergel): consider performance
|
if (!this.upstreamExhausted) {
|
this.refill();
|
}
|
while (!this.buffer.isEmpty()) {
|
const chosenIndex = this.chooseIndex();
|
const result = await this.buffer.shuffleExcise(chosenIndex);
|
if (result.done) {
|
this.upstreamExhausted = true;
|
}
|
else {
|
this.refill();
|
return result;
|
}
|
}
|
return { value: null, done: true };
|
}
|
}
|
//# sourceMappingURL=data:application/json;base64,{"version":3,"file":"lazy_iterator.js","sourceRoot":"","sources":["../../../../../../tfjs-data/src/iterators/lazy_iterator.ts"],"names":[],"mappings":"AAAA;;;;;;;;;;;;;;;;GAgBG;AAEH,OAAO,KAAK,EAAE,MAAM,uBAAuB,CAAC;AAC5C,OAAO,KAAK,UAAU,MAAM,YAAY,CAAC;AAGzC,OAAO,EAAC,SAAS,EAAC,MAAM,oBAAoB,CAAC;AAC7C,OAAO,EAAC,kBAAkB,EAAqC,OAAO,EAAE,SAAS,EAAC,MAAM,kBAAkB,CAAC;AAC3G,OAAO,EAAC,iBAAiB,EAAC,MAAM,6BAA6B,CAAC;AAC9D,OAAO,EAAC,UAAU,EAAC,MAAM,qBAAqB,CAAC;AAO/C,oDAAoD;AACpD,kEAAkE;AAClE,0DAA0D;AAE1D;;GAEG;AACH,MAAM,UAAU,iBAAiB,CAAI,KAAU;IAC7C,OAAO,IAAI,aAAa,CAAC,KAAK,CAAC,CAAC;AAClC,CAAC;AAED;;GAEG;AACH,MAAM,UAAU,wBAAwB,CAAC,KAAa;IACpD,IAAI,CAAC,GAAG,KAAK,CAAC;IACd,OAAO,oBAAoB,CAAC,GAAG,EAAE,CAAC,CAAC,EAAC,KAAK,EAAE,CAAC,EAAE,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC,CAAC,CAAC;AACjE,CAAC;AAED;;;;;;;;;;;;GAYG;AACH,MAAM,UAAU,oBAAoB,CAChC,IACiD;IACnD,OAAO,IAAI,oBAAoB,CAAC,IAAI,CAAC,CAAC;AACxC,CAAC;AAED;;;;;;;;;;;GAWG;AACH,MAAM,UAAU,wBAAwB,CACpC,aAA4C,EAC5C,gBAAwC;IAC1C,OAAO,IAAI,eAAe,CAAC,aAAa,EAAE,gBAAgB,CAAC,CAAC;AAC9D,CAAC;AAED;;;;;;;;;;;;;;;GAeG;AACH,MAAM,UAAU,gCAAgC,CAC5C,YAAmD,EAAE,KAAa,EAClE,gBAAwC;IAC1C,OAAO,wBAAwB,CAC3B,oBAAoB,CAAC,YAAY,CAAC,CAAC,IAAI,CAAC,KAAK,CAAC,EAAE,gBAAgB,CAAC,CAAC;AACxE,CAAC;AAED;;;;;;;;;;;;;;;;;;;;;;;GAuBG;AACH,MAAM,UAAU,kBAAkB,CAC9B,SAA4B,EAC5B,eAAgC,eAAe,CAAC,IAAI;IACtD,OAAO,IAAI,WAAW,CAAI,SAAS,EAAE,YAAY,CAAC,CAAC;AACrD,CAAC;AAED;;;;;;GAMG;AACH,MAAM,OAAgB,YAAY;IAgBhC;;;;;;;OAOG;IACH,KAAK,CAAC,OAAO;QACX,MAAM,MAAM,GAAQ,EAAE,CAAC;QACvB,IAAI,CAAC,GAAG,MAAM,IAAI,CAAC,IAAI,EAAE,CAAC;QAC1B,OAAO,CAAC,CAAC,CAAC,IAAI,EAAE;YACd,MAAM,CAAC,IAAI,CAAC,CAAC,CAAC,KAAK,CAAC,CAAC;YACrB,CAAC,GAAG,MAAM,IAAI,CAAC,IAAI,EAAE,CAAC;SACvB;QACD,OAAO,MAAM,CAAC;IAChB,CAAC;IAED;;;;;;;;;;OAUG;IACH,KAAK,CAAC,cAAc;QAClB,MAAM,MAAM,GAAG,IAAI,CAAC,QAAQ,CAAC,GAAG,CAAC,CAAC;QAClC,MAAM,MAAM,GAAQ,EAAE,CAAC;QACvB,IAAI,CAAC,GAAG,MAAM,MAAM,CAAC,IAAI,EAAE,CAAC;QAC5B,OAAO,CAAC,CAAC,CAAC,IAAI,EAAE;YACd,MAAM,CAAC,IAAI,CAAC,CAAC,CAAC,KAAK,CAAC,CAAC;YACrB,CAAC,GAAG,MAAM,MAAM,CAAC,IAAI,EAAE,CAAC;SACzB;QACD,OAAO,MAAM,CAAC;IAChB,CAAC;IAED;;;;;;OAMG;IACH,KAAK,CAAC,YAAY;QAChB,IAAI,CAAC,GAAG,MAAM,IAAI,CAAC,IAAI,EAAE,CAAC;QAC1B,OAAO,CAAC,CAAC,CAAC,IAAI,EAAE;YACd,CAAC,GAAG,MAAM,IAAI,CAAC,IAAI,EAAE,CAAC;SACvB;IACH,CAAC;IAED;;;;;;OAMG;IACH,KAAK,CAAC,YAAY,CAAC,SAA4B;QAC7C,IAAI,CAAC,GAAG,MAAM,IAAI,CAAC,IAAI,EAAE,CAAC;QAC1B,IAAI,cAAc,GAAG,SAAS,CAAC,CAAC,CAAC,KAAK,CAAC,CAAC;QACxC,OAAO,CAAC,CAAC,CAAC,CAAC,IAAI,CAAC,IAAI,cAAc,EAAE;YAClC,CAAC,GAAG,MAAM,IAAI,CAAC,IAAI,EAAE,CAAC;YACtB,cAAc,GAAG,SAAS,CAAC,CAAC,CAAC,KAAK,CAAC,CAAC;SACrC;IACH,CAAC;IAED;;;;;;;;;;;OAWG;IACH,YAAY,CAAC,OAAkC;QAC7C,OAAO,IAAI,yBAAyB,CAAC,IAAI,EAAE,OAAO,CAAC,CAAC;IACtD,CAAC;IAED,yCAAyC;IAEzC;;;;;;;OAOG;IACH,MAAM,CAAC,SAAgC;QACrC,OAAO,IAAI,cAAc,CAAC,IAAI,EAAE,SAAS,CAAC,CAAC;IAC7C,CAAC;IAED;;;;;;;OAOG;IACH,GAAG,CAAI,SAA0B;QAC/B,OAAO,IAAI,WAAW,CAAC,IAAI,EAAE,SAAS,CAAC,CAAC;IAC1C,CAAC;IAED;;;;;;;OAOG;IACH,QAAQ,CAAI,SAAmC;QAC7C,OAAO,IAAI,gBAAgB,CAAC,IAAI,EAAE,SAAS,CAAC,CAAC;IAC/C,CAAC;IAED;;;;;;;OAOG;IACH,cAAc,CAAI,SAAmC;QACnD,OAAO,IAAI,gBAAgB,CAAC,IAAI,EAAE,SAAS,CAAC,CAAC,MAAM,EAAE,CAAC;IACxD,CAAC;IAED;;;;;;;OAOG;IACH,OAAO,CAAI,SAA4B;QACrC,OAAO,IAAI,eAAe,CAAC,IAAI,EAAE,SAAS,CAAC,CAAC;IAC9C,CAAC;IAED;;;;OAIG;IACH,KAAK,CAAC,YAAY,CAAC,CAAqB;QACtC,OAAO,IAAI,CAAC,GAAG,CAAC,CAAC,CAAC,CAAC,YAAY,EAAE,CAAC;IACpC,CAAC;IAED;;;;;;OAMG;IACH,KAAK,CAAC,aAAa,CAAC,CAAiC;QACnD,OAAO,IAAI,CAAC,cAAc,CAAC,CAAC,CAAC,CAAC,YAAY,CAAC,CAAC,CAAC,EAAE,CAAC,CAAC,CAAC,KAAK,IAAI,CAAC,CAAC,CAAC;IAChE,CAAC;IAED;;;;;;;;;;;;;;;;;OAiBG;IACH,aAAa,CAAC,SAAiB,EAAE,cAAc,GAAG,IAAI;QACpD,OAAO,IAAI,qBAAqB,CAAC,IAAI,EAAE,SAAS,EAAE,cAAc,CAAC,CAAC;IACpE,CAAC;IAED;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;OA+BG;IACH,gBAAgB,CACZ,SAAiB,EAAE,cAAc,GAAG,IAAI;IACxC,kCAAkC;IAClC,QAAsC,SAAS;QAEjD,2EAA2E;QAC3E,MAAM,UAAU,GAAG,IAAI,CAAC,aAAa,CAAC,SAAS,EAAE,cAAc,CAAC,CAAC;QACjE,2EAA2E;QAC3E,uEAAuE;QACvE,OAAO,UAAU,CAAC,GAAG,CAAC,CAAC,CAAC,EAAE,CAAC,OAAO,CAAC,CAAC,EAAE,KAAK,CAAC,CAAC,CAAC;IAChD,CAAC;IAED;;;;;;;;;OASG;IACH,WAAW,CACP,QAAyB,EACzB,gBAAwC;QAC1C,OAAO,IAAI,eAAe,CACtB,iBAAiB,CAAC,CAAC,IAAI,EAAE,QAAQ,CAAC,CAAC,EAAE,gBAAgB,CAAC,CAAC;IAC7D,CAAC;IAED;;;;;;OAMG;IACH,IAAI,CAAC,KAAa;QAChB,IAAI,KAAK,GAAG,CAAC,IAAI,KAAK,IAAI,IAAI,EAAE;YAC9B,OAAO,IAAI,CAAC;SACb;QACD,OAAO,IAAI,YAAY,CAAC,IAAI,EAAE,KAAK,CAAC,CAAC;IACvC,CAAC;IAED;;;;;OAKG;IACH,IAAI,CAAC,KAAa;QAChB,IAAI,KAAK,GAAG,CAAC,IAAI,KAAK,IAAI,IAAI,EAAE;YAC9B,OAAO,IAAI,CAAC;SACb;QACD,OAAO,IAAI,YAAY,CAAC,IAAI,EAAE,KAAK,CAAC,CAAC;IACvC,CAAC;IAED;;;;;;;;OAQG;IACH,QAAQ,CAAC,UAAkB;QACzB,OAAO,IAAI,gBAAgB,CAAC,IAAI,EAAE,UAAU,CAAC,CAAC;IAChD,CAAC;IAED,uDAAuD;IAEvD;;;;;;;OAOG;IACH,OAAO,CAAC,UAAkB,EAAE,IAAa;QACvC,OAAO,IAAI,eAAe,CAAC,IAAI,EAAE,UAAU,EAAE,IAAI,CAAC,CAAC;IACrD,CAAC;IAED;;;OAGG;IACH,MAAM;QACJ,OAAO,IAAI,cAAc,CAAC,IAAI,CAAC,CAAC;IAClC,CAAC;CACF;AAED,+EAA+E;AAC/E,yEAAyE;AACzE,0EAA0E;AAC1E,kDAAkD;AAClD,+EAA+E;AAE/E,mDAAmD;AACnD,+EAA+E;AAE/E,MAAM,aAAiB,SAAQ,YAAe;IAE5C,YAAsB,KAAU;QAC9B,KAAK,EAAE,CAAC;QADY,UAAK,GAAL,KAAK,CAAK;QADxB,SAAI,GAAG,CAAC,CAAC;IAGjB,CAAC;IAED,OAAO;QACL,OAAO,YAAY,IAAI,CAAC,KAAK,CAAC,MAAM,QAAQ,CAAC;IAC/C,CAAC;IAED,KAAK,CAAC,IAAI;QACR,IAAI,IAAI,CAAC,IAAI,IAAI,IAAI,CAAC,KAAK,CAAC,MAAM,EAAE;YAClC,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;SAClC;QACD,MAAM,IAAI,GAAG,IAAI,CAAC,KAAK,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC;QACnC,IAAI,CAAC,IAAI,EAAE,CAAC;QACZ,OAAO,EAAC,KAAK,EAAE,SAAS,CAAC,IAAI,CAAC,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC;IAC/C,CAAC;CACF;AAED,MAAM,oBAAwB,SAAQ,YAAe;IACnD,YACc,MAA2D;QACvE,KAAK,EAAE,CAAC;QADI,WAAM,GAAN,MAAM,CAAqD;IAEzE,CAAC;IAED,OAAO;QACL,OAAO,eAAe,CAAC;IACzB,CAAC;IAED,KAAK,CAAC,IAAI;QACR,IAAI;YACF,OAAO,IAAI,CAAC,MAAM,EAAE,CAAC;SACtB;QAAC,OAAO,CAAC,EAAE;YACV,4DAA4D;YAC5D,CAAC,CAAC,OAAO;gBACL,mDAAmD,CAAC,CAAC,OAAO,EAAE,CAAC;YACnE,MAAM,CAAC,CAAC;SACT;IACH,CAAC;CACF;AAED,MAAM,cAAkB,SAAQ,YAAe;IAK7C,YAAsB,QAAyB;QAC7C,KAAK,EAAE,CAAC;QADY,aAAQ,GAAR,QAAQ,CAAiB;QAE7C,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,OAAO,CAAC,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC,CAAC;IAC9D,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,YAAY,CAAC;IAChD,CAAC;IAED,KAAK,CAAC,IAAI;QACR,qEAAqE;QACrE,yEAAyE;QACzE,uEAAuE;QACvE,oBAAoB;QACpB,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,GAAG,EAAE,CAAC,IAAI,CAAC,UAAU,EAAE,CAAC,CAAC;QAC5D,OAAO,IAAI,CAAC,QAAQ,CAAC;IACvB,CAAC;IAEO,KAAK,CAAC,UAAU;QACtB,OAAO,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;IAC9B,CAAC;CACF;AAED,MAAM,YAAgB,SAAQ,YAAe;IAQ3C,YAAsB,QAAyB,EAAY,QAAgB;QACzE,KAAK,EAAE,CAAC;QADY,aAAQ,GAAR,QAAQ,CAAiB;QAAY,aAAQ,GAAR,QAAQ,CAAQ;QAH3E,sEAAsE;QACtE,UAAK,GAAG,CAAC,CAAC;QAIR,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,OAAO,CAAC,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC,CAAC;IAC9D,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,UAAU,CAAC;IAC9C,CAAC;IAED,KAAK,CAAC,IAAI;QACR,qEAAqE;QACrE,yEAAyE;QACzE,uEAAuE;QACvE,oBAAoB;QACpB,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,GAAG,EAAE,CAAC,IAAI,CAAC,UAAU,EAAE,CAAC,CAAC;QAC5D,OAAO,IAAI,CAAC,QAAQ,CAAC;IACvB,CAAC;IAEO,KAAK,CAAC,UAAU;QACtB,gEAAgE;QAChE,8DAA8D;QAC9D,yEAAyE;QACzE,oBAAoB;QACpB,OAAO,IAAI,CAAC,KAAK,EAAE,GAAG,IAAI,CAAC,QAAQ,EAAE;YACnC,MAAM,OAAO,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;YAC3C,6CAA6C;YAC7C,IAAI,OAAO,CAAC,IAAI,EAAE;gBAChB,OAAO,OAAO,CAAC;aAChB;YACD,EAAE,CAAC,OAAO,CAAC,OAAO,CAAC,KAAW,CAAC,CAAC;SACjC;QACD,OAAO,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;IAC9B,CAAC;CACF;AAED,MAAM,YAAgB,SAAQ,YAAe;IAE3C,YAAsB,QAAyB,EAAY,QAAgB;QACzE,KAAK,EAAE,CAAC;QADY,aAAQ,GAAR,QAAQ,CAAiB;QAAY,aAAQ,GAAR,QAAQ,CAAQ;QAD3E,UAAK,GAAG,CAAC,CAAC;IAGV,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,UAAU,CAAC;IAC9C,CAAC;IAED,KAAK,CAAC,IAAI;QACR,IAAI,IAAI,CAAC,KAAK,EAAE,IAAI,IAAI,CAAC,QAAQ,EAAE;YACjC,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;SAClC;QACD,OAAO,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;IAC9B,CAAC;CACF;AAED,kEAAkE;AAClE,6EAA6E;AAC7E,SAAS;AACT,MAAM,qBAAyB,SAAQ,YAAiB;IAKtD,YACc,QAAyB,EAAY,SAAiB,EACtD,uBAAuB,IAAI;QACvC,KAAK,EAAE,CAAC;QAFI,aAAQ,GAAR,QAAQ,CAAiB;QAAY,cAAS,GAAT,SAAS,CAAQ;QACtD,yBAAoB,GAApB,oBAAoB,CAAO;QAEvC,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,OAAO,CAAC,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC,CAAC;IAC9D,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,mBAAmB,CAAC;IACvD,CAAC;IAED,KAAK,CAAC,IAAI;QACR,qEAAqE;QACrE,yEAAyE;QACzE,uEAAuE;QACvE,oBAAoB;QACpB,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,GAAG,EAAE,CAAC,IAAI,CAAC,UAAU,EAAE,CAAC,CAAC;QAC5D,OAAO,IAAI,CAAC,QAAQ,CAAC;IACvB,CAAC;IAEO,KAAK,CAAC,UAAU;QACtB,MAAM,KAAK,GAAQ,EAAE,CAAC;QACtB,OAAO,KAAK,CAAC,MAAM,GAAG,IAAI,CAAC,SAAS,EAAE;YACpC,MAAM,IAAI,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;YACxC,IAAI,IAAI,CAAC,IAAI,EAAE;gBACb,IAAI,IAAI,CAAC,oBAAoB,IAAI,KAAK,CAAC,MAAM,GAAG,CAAC,EAAE;oBACjD,OAAO,EAAC,KAAK,EAAE,KAAK,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC;iBACpC;gBACD,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;aAClC;YACD,KAAK,CAAC,IAAI,CAAC,IAAI,CAAC,KAAK,CAAC,CAAC;SACxB;QACD,OAAO,EAAC,KAAK,EAAE,KAAK,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC;IACrC,CAAC;CACF;AAED,MAAM,cAAkB,SAAQ,YAAe;IAK7C,YACc,QAAyB,EACzB,SAAgC;QAC5C,KAAK,EAAE,CAAC;QAFI,aAAQ,GAAR,QAAQ,CAAiB;QACzB,cAAS,GAAT,SAAS,CAAuB;QAE5C,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,OAAO,CAAC,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC,CAAC;IAC9D,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,YAAY,CAAC;IAChD,CAAC;IAED,KAAK,CAAC,IAAI;QACR,qEAAqE;QACrE,yEAAyE;QACzE,uEAAuE;QACvE,oBAAoB;QACpB,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,GAAG,EAAE,CAAC,IAAI,CAAC,UAAU,EAAE,CAAC,CAAC;QAC5D,OAAO,IAAI,CAAC,QAAQ,CAAC;IACvB,CAAC;IAEO,KAAK,CAAC,UAAU;QACtB,OAAO,IAAI,EAAE;YACX,MAAM,IAAI,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;YACxC,IAAI,IAAI,CAAC,IAAI,IAAI,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,KAAK,CAAC,EAAE;gBAC3C,OAAO,IAAI,CAAC;aACb;YACD,EAAE,CAAC,OAAO,CAAC,IAAI,CAAC,KAAW,CAAC,CAAC;SAC9B;IACH,CAAC;CACF;AAED,MAAM,WAAkB,SAAQ,YAAe;IAC7C,YACc,QAAyB,EACzB,SAA0B;QACtC,KAAK,EAAE,CAAC;QAFI,aAAQ,GAAR,QAAQ,CAAiB;QACzB,cAAS,GAAT,SAAS,CAAiB;IAExC,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,SAAS,CAAC;IAC7C,CAAC;IAED,KAAK,CAAC,IAAI;QACR,MAAM,IAAI,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;QACxC,IAAI,IAAI,CAAC,IAAI,EAAE;YACb,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;SAClC;QACD,MAAM,YAAY,GAAG,EAAE,CAAC,WAAW,CAAC,qBAAqB,CAAC,IAAI,CAAC,KAAW,CAAC,CAAC;QAC5E,uDAAuD;QACvD,mEAAmE;QACnE,uEAAuE;QACvE,kEAAkE;QAClE,kEAAkE;QAClE,UAAU;QACV,MAAM,MAAM,GAAG,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,KAAK,CAAC,CAAC;QAC1C,MAAM,aAAa,GAAG,EAAE,CAAC,WAAW,CAAC,qBAAqB,CAAC,MAAY,CAAC,CAAC;QAEzE,oCAAoC;QACpC,mDAAmD;QACnD,KAAK,MAAM,CAAC,IAAI,YAAY,EAAE;YAC5B,IAAI,CAAC,EAAE,CAAC,WAAW,CAAC,cAAc,CAAC,CAAC,EAAE,aAAa,CAAC,EAAE;gBACpD,CAAC,CAAC,OAAO,EAAE,CAAC;aACb;SACF;QACD,OAAO,EAAC,KAAK,EAAE,MAAM,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC;IACtC,CAAC;CACF;AAED,MAAM,yBAA6B,SAAQ,YAAe;IAExD,YACc,QAAyB,EACzB,OAAkC;QAC9C,KAAK,EAAE,CAAC;QAFI,aAAQ,GAAR,QAAQ,CAAiB;QACzB,YAAO,GAAP,OAAO,CAA2B;QAHhD,UAAK,GAAG,CAAC,CAAC;QAKR,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,OAAO,CAAC,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC,CAAC;IAC9D,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,kBAAkB,CAAC;IACtD,CAAC;IAMD,KAAK,CAAC,IAAI;QACR,qEAAqE;QACrE,yEAAyE;QACzE,uEAAuE;QACvE,oBAAoB;QACpB,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,GAAG,EAAE,CAAC,IAAI,CAAC,UAAU,EAAE,CAAC,CAAC;QAC5D,OAAO,IAAI,CAAC,QAAQ,CAAC;IACvB,CAAC;IAED,KAAK,CAAC,UAAU;QACd,OAAO,IAAI,EAAE;YACX,IAAI;gBACF,OAAO,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;aACnC;YAAC,OAAO,CAAC,EAAE;gBACV,IAAI,CAAC,IAAI,CAAC,OAAO,CAAC,CAAC,CAAC,EAAE;oBACpB,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;iBAClC;gBACD,sEAAsE;gBAEtE,sEAAsE;gBACtE,uEAAuE;gBACvE,wEAAwE;aACzE;SACF;IACH,CAAC;CACF;AAED,MAAM,gBAAuB,SAAQ,YAAe;IAClD,YACc,QAAyB,EACzB,SAAmC;QAC/C,KAAK,EAAE,CAAC;QAFI,aAAQ,GAAR,QAAQ,CAAiB;QACzB,cAAS,GAAT,SAAS,CAA0B;IAEjD,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,cAAc,CAAC;IAClD,CAAC;IAED,KAAK,CAAC,IAAI;QACR,MAAM,IAAI,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;QACxC,IAAI,IAAI,CAAC,IAAI,EAAE;YACb,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;SAClC;QACD,MAAM,YAAY,GAAG,EAAE,CAAC,WAAW,CAAC,qBAAqB,CAAC,IAAI,CAAC,KAAW,CAAC,CAAC;QAC5E,uDAAuD;QACvD,mEAAmE;QACnE,uEAAuE;QACvE,kEAAkE;QAClE,kEAAkE;QAClE,UAAU;QACV,MAAM,MAAM,GAAG,MAAM,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,KAAK,CAAC,CAAC;QAChD,MAAM,aAAa,GAAG,EAAE,CAAC,WAAW,CAAC,qBAAqB,CAAC,MAAY,CAAC,CAAC;QAEzE,oCAAoC;QACpC,mDAAmD;QACnD,KAAK,MAAM,CAAC,IAAI,YAAY,EAAE;YAC5B,IAAI,CAAC,EAAE,CAAC,WAAW,CAAC,cAAc,CAAC,CAAC,EAAE,aAAa,CAAC,EAAE;gBACpD,CAAC,CAAC,OAAO,EAAE,CAAC;aACb;SACF;QACD,OAAO,EAAC,KAAK,EAAE,MAAM,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC;IACtC,CAAC;CACF;AAED,mDAAmD;AACnD,+EAA+E;AAE/E;;;;;;;GAOG;AACH,MAAM,OAAgB,iBAAqB,SAAQ,YAAe;IAQhE;QACE,KAAK,EAAE,CAAC;QACR,IAAI,CAAC,WAAW,GAAG,IAAI,iBAAiB,EAAK,CAAC;QAC9C,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,OAAO,CAAC,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC,CAAC;IAC9D,CAAC;IAED,KAAK,CAAC,IAAI;QACR,qEAAqE;QACrE,yEAAyE;QACzE,uEAAuE;QACvE,oBAAoB;QACpB,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,GAAG,EAAE,CAAC,IAAI,CAAC,UAAU,EAAE,CAAC,CAAC;QAC5D,OAAO,IAAI,CAAC,QAAQ,CAAC;IACvB,CAAC;IAgBD,KAAK,CAAC,UAAU;QACd,kEAAkE;QAClE,sEAAsE;QACtE,wDAAwD;QACxD,OAAO,IAAI,CAAC,WAAW,CAAC,MAAM,EAAE,KAAK,CAAC,EAAE;YACtC,0CAA0C;YAC1C,IAAI,CAAC,MAAM,IAAI,CAAC,IAAI,EAAE,EAAE;gBACtB,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;aAClC;SACF;QACD,OAAO,EAAC,KAAK,EAAE,IAAI,CAAC,WAAW,CAAC,KAAK,EAAE,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC;IACxD,CAAC;CACF;AACD,MAAM,eAAsB,SAAQ,iBAAoB;IACtD,YACc,QAAyB,EACzB,SAA4B;QACxC,KAAK,EAAE,CAAC;QAFI,aAAQ,GAAR,QAAQ,CAAiB;QACzB,cAAS,GAAT,SAAS,CAAmB;IAE1C,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,aAAa,CAAC;IACjD,CAAC;IAED,KAAK,CAAC,IAAI;QACR,MAAM,IAAI,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;QACxC,IAAI,IAAI,CAAC,IAAI,EAAE;YACb,OAAO,KAAK,CAAC;SACd;QACD,MAAM,YAAY,GAAG,EAAE,CAAC,WAAW,CAAC,qBAAqB,CAAC,IAAI,CAAC,KAAW,CAAC,CAAC;QAC5E,uDAAuD;QACvD,mEAAmE;QACnE,uEAAuE;QACvE,sEAAsE;QACtE,sEAAsE;QACtE,MAAM,WAAW,GAAG,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,KAAK,CAAC,CAAC;QAC/C,MAAM,aAAa,GACf,EAAE,CAAC,WAAW,CAAC,qBAAqB,CAAC,WAAiB,CAAC,CAAC;QAC5D,IAAI,CAAC,WAAW,CAAC,OAAO,CAAC,WAAW,CAAC,CAAC;QAEtC,mEAAmE;QACnE,mDAAmD;QACnD,KAAK,MAAM,CAAC,IAAI,YAAY,EAAE;YAC5B,IAAI,CAAC,EAAE,CAAC,WAAW,CAAC,cAAc,CAAC,CAAC,EAAE,aAAa,CAAC,EAAE;gBACpD,CAAC,CAAC,OAAO,EAAE,CAAC;aACb;SACF;QAED,OAAO,IAAI,CAAC;IACd,CAAC;CACF;AAED;;;;;;;;GAQG;AACH,MAAM,OAAO,eAAmB,SAAQ,YAAe;IASrD,YACI,SAAwC,EACvB,gBAAwC;QAC3D,KAAK,EAAE,CAAC;QADW,qBAAgB,GAAhB,gBAAgB,CAAwB;QAV7D,kCAAkC;QAClC,qEAAqE;QAC7D,aAAQ,GAA+B,IAAI,CAAC;QAEpD,sEAAsE;QAC9D,aAAQ,GAAoB,IAAI,CAAC;QAOvC,IAAI,CAAC,aAAa,GAAG,SAAS,CAAC;IACjC,CAAC;IAED,OAAO;QACL,MAAM,iBAAiB,GAAG,6CAA6C,CAAC;QACxE,OAAO,GAAG,iBAAiB,aAAa,CAAC;IAC3C,CAAC;IAED,KAAK,CAAC,IAAI;QACR,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC,aAAa,CAAC,IAAI,CAAC,QAAQ,CAAC,CAAC;QAClD,OAAO,IAAI,CAAC,QAAQ,CAAC;IACvB,CAAC;IAEO,KAAK,CAAC,aAAa,CAAC,QAAoC;QAE9D,4EAA4E;QAC5E,qDAAqD;QACrD,oEAAoE;QACpE,6CAA6C;QAC7C,4DAA4D;QAC5D,MAAM,QAAQ,CAAC;QACf,IAAI,IAAI,CAAC,QAAQ,IAAI,IAAI,EAAE;YACzB,MAAM,cAAc,GAAG,MAAM,IAAI,CAAC,aAAa,CAAC,IAAI,EAAE,CAAC;YACvD,IAAI,cAAc,CAAC,IAAI,EAAE;gBACvB,kCAAkC;gBAClC,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;aAClC;YACD,IAAI,CAAC,QAAQ,GAAG,cAAc,CAAC,KAAK,CAAC;YACrC,IAAI,IAAI,CAAC,gBAAgB,IAAI,IAAI,EAAE;gBACjC,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,YAAY,CAAC,IAAI,CAAC,gBAAgB,CAAC,CAAC;aACnE;SACF;QACD,MAAM,UAAU,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;QAC9C,IAAI,UAAU,CAAC,IAAI,EAAE;YACnB,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC;YACrB,OAAO,IAAI,CAAC,aAAa,CAAC,QAAQ,CAAC,CAAC;SACrC;QACD,OAAO,UAAU,CAAC;IACpB,CAAC;CACF;AAED,MAAM,CAAN,IAAY,eAIX;AAJD,WAAY,eAAe;IACzB,qDAAI,CAAA;IACJ,6DAAQ,CAAA;IACR,2DAAO,CAAA,CAAI,8DAA8D;AAC3E,CAAC,EAJW,eAAe,KAAf,eAAe,QAI1B;AAED;;;;;;;;;;;;;;;;;;;;;;;;;;;;GA4BG;AACH,MAAM,WAA0C,SAAQ,YAAe;IAIrE,YACuB,SAA4B,EAC5B,eAAgC,eAAe,CAAC,IAAI;QACzE,KAAK,EAAE,CAAC;QAFa,cAAS,GAAT,SAAS,CAAmB;QAC5B,iBAAY,GAAZ,YAAY,CAAwC;QALnE,UAAK,GAAG,CAAC,CAAC;QACV,mBAAc,GAA+B,IAAI,CAAC;IAM1D,CAAC;IAED,OAAO;QACL,MAAM,iBAAiB,GAAG,yCAAyC,CAAC;QACpE,OAAO,IAAI,iBAAiB,UAAU,CAAC;IACzC,CAAC;IAEO,KAAK,CAAC,SAAS,CAAC,UAAsC;QAE5D,uEAAuE;QACvE,0CAA0C;QAC1C,MAAM,UAAU,CAAC;QAEjB,iEAAiE;QACjE,YAAY;QACZ,IAAI,YAAY,GAAG,CAAC,CAAC;QACrB,IAAI,aAAa,GAAG,CAAC,CAAC;QAEtB,SAAS,OAAO,CAAC,SAA4B;YAC3C,IAAI,SAAS,YAAY,YAAY,EAAE;gBACrC,MAAM,MAAM,GAAG,SAAS,CAAC,IAAI,EAAE,CAAC;gBAChC,OAAO;oBACL,KAAK,EAAE,MAAM,CAAC,IAAI,CAAC,CAAC,CAAC,EAAE;wBACrB,YAAY,EAAE,CAAC;wBACf,IAAI,CAAC,CAAC,IAAI,EAAE;4BACV,aAAa,EAAE,CAAC;yBACjB;wBACD,OAAO,CAAC,CAAC,KAAK,CAAC;oBACjB,CAAC,CAAC;oBACF,OAAO,EAAE,KAAK;iBACf,CAAC;aACH;iBAAM;gBACL,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,OAAO,EAAE,IAAI,EAAC,CAAC;aACrC;QACH,CAAC;QAED,MAAM,MAAM,GAAM,MAAM,kBAAkB,CAAC,IAAI,CAAC,SAAS,EAAE,OAAO,CAAC,CAAC;QAEpE,IAAI,YAAY,KAAK,aAAa,EAAE;YAClC,8BAA8B;YAC9B,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;SAClC;QACD,IAAI,aAAa,GAAG,CAAC,EAAE;YACrB,QAAQ,IAAI,CAAC,YAAY,EAAE;gBACzB,KAAK,eAAe,CAAC,IAAI;oBACvB,MAAM,IAAI,KAAK,CACX,8CAA8C;wBAC9C,yBAAyB,IAAI,CAAC,KAAK,GAAG,CAAC,CAAC;gBAC9C,KAAK,eAAe,CAAC,QAAQ;oBAC3B,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;gBACnC,KAAK,eAAe,CAAC,OAAO,CAAC;gBAC7B,QAAQ;gBACN,iEAAiE;aACpE;SACF;QAED,IAAI,CAAC,KAAK,EAAE,CAAC;QACb,OAAO,EAAC,KAAK,EAAE,MAAM,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC;IACtC,CAAC;IAED,KAAK,CAAC,IAAI;QACR,IAAI,CAAC,cAAc,GAAG,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,cAAc,CAAC,CAAC;QAC1D,OAAO,IAAI,CAAC,cAAc,CAAC;IAC7B,CAAC;CACF;AAED,4DAA4D;AAC5D,+EAA+E;AAE/E;;;;;;GAMG;AACH,MAAM,OAAO,gBAAoB,SAAQ,YAAe;IAGtD,YACc,QAAyB,EAAY,UAAkB;QACnE,KAAK,EAAE,CAAC;QADI,aAAQ,GAAR,QAAQ,CAAiB;QAAY,eAAU,GAAV,UAAU,CAAQ;QAEnE,IAAI,CAAC,MAAM,GAAG,IAAI,UAAU,CAA6B,UAAU,CAAC,CAAC;IACvE,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,cAAc,CAAC;IAClD,CAAC;IAED;;;OAGG;IACO,MAAM;QACd,OAAO,CAAC,IAAI,CAAC,MAAM,CAAC,MAAM,EAAE,EAAE;YAC5B,MAAM,CAAC,GAAG,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;YAC/B,IAAI,CAAC,MAAM,CAAC,IAAI,CAAC,CAAC,CAAC,CAAC;SACrB;IACH,CAAC;IAED,IAAI;QACF,IAAI,CAAC,MAAM,EAAE,CAAC;QACd,oEAAoE;QACpE,sEAAsE;QACtE,kEAAkE;QAClE,OAAO,IAAI,CAAC,MAAM,CAAC,KAAK,EAAE,CAAC;IAC7B,CAAC;CACF;AAED;;;;;GAKG;AACH,MAAM,OAAO,eAAmB,SAAQ,gBAAmB;IAUzD,YACqB,QAAyB,EAAY,UAAkB,EACxE,IAAa;QACf,KAAK,CAAC,QAAQ,EAAE,UAAU,CAAC,CAAC;QAFT,aAAQ,GAAR,QAAQ,CAAiB;QAAY,eAAU,GAAV,UAAU,CAAQ;QAJ5E,sEAAsE;QAC9D,sBAAiB,GAAG,KAAK,CAAC;QAMhC,IAAI,CAAC,MAAM,GAAG,UAAU,CAAC,IAAI,CAAC,IAAI,IAAI,EAAE,CAAC,IAAI,CAAC,GAAG,EAAE,CAAC,QAAQ,EAAE,CAAC,CAAC;QAChE,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,OAAO,CAAC,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC,CAAC;IAC9D,CAAC;IAEQ,KAAK,CAAC,IAAI;QACjB,qEAAqE;QACrE,yEAAyE;QACzE,uEAAuE;QACvE,oBAAoB;QACpB,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,GAAG,EAAE,CAAC,IAAI,CAAC,UAAU,EAAE,CAAC,CAAC;QAC5D,OAAO,IAAI,CAAC,QAAQ,CAAC;IACvB,CAAC;IAEO,SAAS,CAAC,GAAW;QAC3B,OAAO,IAAI,CAAC,KAAK,CAAC,IAAI,CAAC,MAAM,EAAE,GAAG,GAAG,CAAC,CAAC;IACzC,CAAC;IAES,WAAW;QACnB,OAAO,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,MAAM,CAAC,MAAM,EAAE,CAAC,CAAC;IAC9C,CAAC;IAED,KAAK,CAAC,UAAU;QACd,sCAAsC;QACtC,IAAI,CAAC,IAAI,CAAC,iBAAiB,EAAE;YAC3B,IAAI,CAAC,MAAM,EAAE,CAAC;SACf;QACD,OAAO,CAAC,IAAI,CAAC,MAAM,CAAC,OAAO,EAAE,EAAE;YAC7B,MAAM,WAAW,GAAG,IAAI,CAAC,WAAW,EAAE,CAAC;YACvC,MAAM,MAAM,GAAG,MAAM,IAAI,CAAC,MAAM,CAAC,aAAa,CAAC,WAAW,CAAC,CAAC;YAC5D,IAAI,MAAM,CAAC,IAAI,EAAE;gBACf,IAAI,CAAC,iBAAiB,GAAG,IAAI,CAAC;aAC/B;iBAAM;gBACL,IAAI,CAAC,MAAM,EAAE,CAAC;gBACd,OAAO,MAAM,CAAC;aACf;SACF;QACD,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;IACnC,CAAC;CACF","sourcesContent":["/**\n * @license\n * Copyright 2018 Google LLC. All Rights Reserved.\n * Licensed under the Apache License, Version 2.0 (the \"License\");\n * you may not use this file except in compliance with the License.\n * You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n *\n * =============================================================================\n */\n\nimport * as tf from '@tensorflow/tfjs-core';\nimport * as seedrandom from 'seedrandom';\n\nimport {Container} from '../types';\nimport {deepClone} from '../util/deep_clone';\nimport {deepMapAndAwaitAll, DeepMapAsyncResult, DeepMapResult, deepZip, zipToList} from '../util/deep_map';\nimport {GrowingRingBuffer} from '../util/growing_ring_buffer';\nimport {RingBuffer} from '../util/ring_buffer';\n\n/**\n * A nested structure of LazyIterators, used as the input to zip().\n */\nexport type IteratorContainer = Container<LazyIterator<tf.TensorContainer>>;\n\n// Here we implement a simple asynchronous iterator.\n// This lets us avoid using either third-party stream libraries or\n// recent TypeScript language support requiring polyfills.\n\n/**\n * Create a `LazyIterator` from an array of items.\n */\nexport function iteratorFromItems<T>(items: T[]): LazyIterator<T> {\n  return new ArrayIterator(items);\n}\n\n/**\n * Create a `LazyIterator` of incrementing integers.\n */\nexport function iteratorFromIncrementing(start: number): LazyIterator<number> {\n  let i = start;\n  return iteratorFromFunction(() => ({value: i++, done: false}));\n}\n\n/**\n * Create a `LazyIterator` from a function.\n *\n * ```js\n * let i = -1;\n * const func = () =>\n *    ++i < 5 ? {value: i, done: false} : {value: null, done: true};\n * const iter = tf.data.iteratorFromFunction(func);\n * await iter.forEachAsync(e => console.log(e));\n * ```\n *\n * @param func A function that produces data on each call.\n */\nexport function iteratorFromFunction<T>(\n    func: () =>\n        IteratorResult<T>| Promise<IteratorResult<T>>): LazyIterator<T> {\n  return new FunctionCallIterator(func);\n}\n\n/**\n * Create a `LazyIterator` by concatenating underlying streams, which are\n * themselves provided as a stream.\n *\n * This can also be thought of as a \"stream flatten\" operation.\n *\n * @param baseIterators A stream of streams to be concatenated.\n * @param baseErrorHandler An optional function that can intercept `Error`s\n *   raised during a `next()` call on the base stream.  This function can decide\n *   whether the error should be propagated, whether the error should be\n *   ignored, or whether the base stream should be terminated.\n */\nexport function iteratorFromConcatenated<T>(\n    baseIterators: LazyIterator<LazyIterator<T>>,\n    baseErrorHandler?: (e: Error) => boolean): LazyIterator<T> {\n  return new ChainedIterator(baseIterators, baseErrorHandler);\n}\n\n/**\n * Create a `LazyIterator` by concatenating streams produced by calling a\n * stream-generating function a given number of times.\n *\n * Since a `LazyIterator` is read-once, it cannot be repeated, but this\n * function can be used to achieve a similar effect:\n *\n *   LazyIterator.ofConcatenatedFunction(() => new MyIterator(), 6);\n *\n * @param iteratorFunc: A function that produces a new stream on each call.\n * @param count: The number of times to call the function.\n * @param baseErrorHandler An optional function that can intercept `Error`s\n *   raised during a `next()` call on the base stream.  This function can decide\n *   whether the error should be propagated, whether the error should be\n *   ignored, or whether the base stream should be terminated.\n */\nexport function iteratorFromConcatenatedFunction<T>(\n    iteratorFunc: () => IteratorResult<LazyIterator<T>>, count: number,\n    baseErrorHandler?: (e: Error) => boolean): LazyIterator<T> {\n  return iteratorFromConcatenated(\n      iteratorFromFunction(iteratorFunc).take(count), baseErrorHandler);\n}\n\n/**\n * Create a `LazyIterator` by zipping together an array, dict, or nested\n * structure of `LazyIterator`s (and perhaps additional constants).\n *\n * The underlying streams must provide elements in a consistent order such\n * that they correspond.\n *\n * Typically, the underlying streams should have the same number of\n * elements. If they do not, the behavior is determined by the\n * `mismatchMode` argument.\n *\n * The nested structure of the `iterators` argument determines the\n * structure of elements in the resulting iterator.\n *\n * @param iterators: An array or object containing LazyIterators at the\n * leaves.\n * @param mismatchMode: Determines what to do when one underlying iterator\n * is exhausted before the others.  `ZipMismatchMode.FAIL` (the default)\n * causes an error to be thrown in this case.  `ZipMismatchMode.SHORTEST`\n * causes the zipped iterator to terminate with the furst underlying\n * streams, so elements remaining on the longer streams are ignored.\n * `ZipMismatchMode.LONGEST` causes the zipped stream to continue, filling\n * in nulls for the exhausted streams, until all streams are exhausted.\n */\nexport function iteratorFromZipped<O extends tf.TensorContainer>(\n    iterators: IteratorContainer,\n    mismatchMode: ZipMismatchMode = ZipMismatchMode.FAIL): LazyIterator<O> {\n  return new ZipIterator<O>(iterators, mismatchMode);\n}\n\n/**\n * An asynchronous iterator, providing lazy access to a potentially\n * unbounded stream of elements.\n *\n * Iterator can be obtained from a dataset:\n * `const iter = await dataset.iterator();`\n */\nexport abstract class LazyIterator<T> {\n  // This class implements AsyncIterator<T>, but we have not yet set the\n  // TypeScript --downlevelIteration flag to enable that.\n\n  abstract summary(): string;\n\n  /**\n   * Returns a `Promise` for the next element in the stream.\n   *\n   * When an item can be provided successfully, the return value is\n   * `{value:T, done:false}`.\n   *\n   * Calling next() on a closed stream returns `{value:null, done:true}`.\n   */\n  abstract next(): Promise<IteratorResult<T>>;\n\n  /**\n   * Collect all remaining elements of a bounded stream into an array.\n   * Obviously this will succeed only for small streams that fit in memory.\n   * Useful for testing.\n   *\n   * @returns A Promise for an array of stream elements, which will resolve\n   *   when the stream is exhausted.\n   */\n  async toArray(): Promise<T[]> {\n    const result: T[] = [];\n    let x = await this.next();\n    while (!x.done) {\n      result.push(x.value);\n      x = await this.next();\n    }\n    return result;\n  }\n\n  /**\n   * Collect all elements of this dataset into an array with prefetching 100\n   * elements. This is useful for testing, because the prefetch changes the\n   * order in which the Promises are resolved along the processing pipeline.\n   * This may help expose bugs where results are dependent on the order of\n   * Promise resolution rather than on the logical order of the stream (i.e.,\n   * due to hidden mutable state).\n   *\n   * @returns A Promise for an array of stream elements, which will resolve\n   *   when the stream is exhausted.\n   */\n  async toArrayForTest(): Promise<T[]> {\n    const stream = this.prefetch(100);\n    const result: T[] = [];\n    let x = await stream.next();\n    while (!x.done) {\n      result.push(x.value);\n      x = await stream.next();\n    }\n    return result;\n  }\n\n  /**\n   * Draw items from the stream until it is exhausted.\n   *\n   * This can be useful when the stream has side effects but no output.  In\n   * that case, calling this function guarantees that the stream will be\n   * fully processed.\n   */\n  async resolveFully(): Promise<void> {\n    let x = await this.next();\n    while (!x.done) {\n      x = await this.next();\n    }\n  }\n\n  /**\n   * Draw items from the stream until it is exhausted, or a predicate fails.\n   *\n   * This can be useful when the stream has side effects but no output.  In\n   * that case, calling this function guarantees that the stream will be\n   * fully processed.\n   */\n  async resolveWhile(predicate: (r: T) => boolean): Promise<void> {\n    let x = await this.next();\n    let shouldContinue = predicate(x.value);\n    while ((!x.done) && shouldContinue) {\n      x = await this.next();\n      shouldContinue = predicate(x.value);\n    }\n  }\n\n  /**\n   * Handles errors thrown on this stream using a provided handler function.\n   *\n   * @param handler A function that handles any `Error` thrown during a `next()`\n   *   call and returns true if the stream should continue (dropping the failed\n   *   call) or false if the stream should quietly terminate.  If the handler\n   *   itself throws (or rethrows) an `Error`, that will be propagated.\n   *\n   * @returns A `LazyIterator` of elements passed through from upstream,\n   *   possibly filtering or terminating on upstream `next()` calls that\n   *   throw an `Error`.\n   */\n  handleErrors(handler: (error: Error) => boolean): LazyIterator<T> {\n    return new ErrorHandlingLazyIterator(this, handler);\n  }\n\n  // TODO(soergel): Implement reduce() etc.\n\n  /**\n   * Filters this stream according to `predicate`.\n   *\n   * @param predicate A function mapping a stream element to a boolean or a\n   * `Promise` for one.\n   *\n   * @returns A `LazyIterator` of elements for which the predicate was true.\n   */\n  filter(predicate: (value: T) => boolean): LazyIterator<T> {\n    return new FilterIterator(this, predicate);\n  }\n\n  /**\n   * Maps this stream through a 1-to-1 transform.\n   *\n   * @param transform A function mapping a stream element to a transformed\n   *   element.\n   *\n   * @returns A `LazyIterator` of transformed elements.\n   */\n  map<O>(transform: (value: T) => O): LazyIterator<O> {\n    return new MapIterator(this, transform);\n  }\n\n  /**\n   * Maps this stream through an async 1-to-1 transform.\n   *\n   * @param transform A function mapping a stream element to a `Promise` for a\n   *   transformed stream element.\n   *\n   * @returns A `LazyIterator` of transformed elements.\n   */\n  mapAsync<O>(transform: (value: T) => Promise<O>): LazyIterator<O> {\n    return new AsyncMapIterator(this, transform);\n  }\n\n  /**\n   * Maps this stream through a 1-to-1 transform, forcing serial execution.\n   *\n   * @param transform A function mapping a stream element to a transformed\n   *   element.\n   *\n   * @returns A `LazyIterator` of transformed elements.\n   */\n  serialMapAsync<O>(transform: (value: T) => Promise<O>): LazyIterator<O> {\n    return new AsyncMapIterator(this, transform).serial();\n  }\n\n  /**\n   * Maps this stream through a 1-to-many transform.\n   *\n   * @param transform A function mapping a stream element to an array of\n   *   transformed elements.\n   *\n   * @returns A `DataStream` of transformed elements.\n   */\n  flatmap<O>(transform: (value: T) => O[]): LazyIterator<O> {\n    return new FlatmapIterator(this, transform);\n  }\n\n  /**\n   * Apply a function to every element of the stream.\n   *\n   * @param f A function to apply to each stream element.\n   */\n  async forEachAsync(f: (value: T) => void): Promise<void> {\n    return this.map(f).resolveFully();\n  }\n\n  /**\n   * Apply a function to every element of the stream, forcing serial execution.\n   *\n   * @param f A function to apply to each stream element.  Should return 'true'\n   *   to indicate that the stream should continue, or 'false' to cause it to\n   *   terminate.\n   */\n  async serialForEach(f: (value: T) => Promise<boolean>): Promise<void> {\n    return this.serialMapAsync(f).resolveWhile(x => (x === true));\n  }\n\n  /**\n   * Groups elements into batches, represented as arrays of elements.\n   *\n   * We can think of the elements of this iterator as 'rows' (even if they are\n   * nested structures).  By the same token, consecutive values for a given\n   * key within the elements form a 'column'.  This matches the usual sense of\n   * 'row' and 'column' when processing tabular data (e.g., parsing a CSV).\n   *\n   * Thus, \"Row-major\" means that the resulting batch is simply a collection of\n   * rows: `[row1, row2, row3, ...]`.  This is contrast to the column-major\n   * form, which is needed for vectorized computation.\n   *\n   * @param batchSize The number of elements desired per batch.\n   * @param smallLastBatch Whether to emit the final batch when it has fewer\n   *   than batchSize elements. Default true.\n   * @returns A `LazyIterator` of batches of elements, represented as arrays\n   *   of the original element type.\n   */\n  rowMajorBatch(batchSize: number, smallLastBatch = true): LazyIterator<T[]> {\n    return new RowMajorBatchIterator(this, batchSize, smallLastBatch);\n  }\n\n  /**\n   * Groups elements into batches, represented in column-major form.\n   *\n   * We can think of the elements of this iterator as 'rows' (even if they are\n   * nested structures).  By the same token, consecutive values for a given\n   * key within the elements form a 'column'.  This matches the usual sense of\n   * 'row' and 'column' when processing tabular data (e.g., parsing a CSV).\n   *\n   * Thus, \"column-major\" means that the resulting batch is a (potentially\n   * nested) structure representing the columns.  Each column entry, then,\n   * contains a collection of the values found in that column for a range of\n   * input elements.  This representation allows for vectorized computation, in\n   * contrast to the row-major form.\n   *\n   * The inputs should all have the same nested structure (i.e., of arrays and\n   * dicts).  The result is a single object with the same nested structure,\n   * where the leaves are arrays collecting the values of the inputs at that\n   * location (or, optionally, the result of a custom function applied to those\n   * arrays).\n   *\n   * @param batchSize The number of elements desired per batch.\n   * @param smallLastBatch Whether to emit the final batch when it has fewer\n   *   than batchSize elements. Default true.\n   * @param zipFn: (optional) A function that expects an array of elements at a\n   *   single node of the object tree, and returns a `DeepMapResult`.  The\n   *   `DeepMapResult` either provides a result value for that node (i.e.,\n   *   representing the subtree), or indicates that the node should be processed\n   *   recursively.  The default zipFn recurses as far as possible and places\n   *   arrays at the leaves.\n   * @returns A `LazyIterator` of batches of elements, represented as an object\n   *   with collections at the leaves.\n   */\n  columnMajorBatch(\n      batchSize: number, smallLastBatch = true,\n      // tslint:disable-next-line:no-any\n      zipFn: (xs: any[]) => DeepMapResult = zipToList):\n      LazyIterator<tf.TensorContainer> {\n    // First collect the desired number of input elements as a row-major batch.\n    const rowBatches = this.rowMajorBatch(batchSize, smallLastBatch);\n    // Now 'rotate' or 'pivot' the data, collecting all values from each column\n    // in the batch (i.e., for each key within the elements) into an array.\n    return rowBatches.map(x => deepZip(x, zipFn));\n  }\n\n  /**\n   * Concatenate this `LazyIterator` with another.\n   *\n   * @param iterator A `LazyIterator` to be concatenated onto this one.\n   * @param baseErrorHandler An optional function that can intercept `Error`s\n   *   raised during a `next()` call on the base stream.  This function can\n   *   decide whether the error should be propagated, whether the error should\n   *   be ignored, or whether the base stream should be terminated.\n   * @returns A `LazyIterator`.\n   */\n  concatenate(\n      iterator: LazyIterator<T>,\n      baseErrorHandler?: (e: Error) => boolean): LazyIterator<T> {\n    return new ChainedIterator(\n        iteratorFromItems([this, iterator]), baseErrorHandler);\n  }\n\n  /**\n   * Limits this stream to return at most `count` items.\n   *\n   * @param count The maximum number of items to provide from the stream. If\n   * a negative or undefined value is given, the entire stream is returned\n   *   unaltered.\n   */\n  take(count: number): LazyIterator<T> {\n    if (count < 0 || count == null) {\n      return this;\n    }\n    return new TakeIterator(this, count);\n  }\n\n  /**\n   * Skips the first `count` items in this stream.\n   *\n   * @param count The number of items to skip.  If a negative or undefined\n   * value is given, the entire stream is returned unaltered.\n   */\n  skip(count: number): LazyIterator<T> {\n    if (count < 0 || count == null) {\n      return this;\n    }\n    return new SkipIterator(this, count);\n  }\n\n  /**\n   * Prefetch the first `bufferSize` items in this stream.\n   *\n   * Note this prefetches Promises, but makes no guarantees about when those\n   * Promises resolve.\n   *\n   * @param bufferSize: An integer specifying the number of elements to be\n   *   prefetched.\n   */\n  prefetch(bufferSize: number): LazyIterator<T> {\n    return new PrefetchIterator(this, bufferSize);\n  }\n\n  // TODO(soergel): deep sharded shuffle, where supported\n\n  /**\n   * Randomly shuffles the elements of this stream.\n   *\n   * @param bufferSize: An integer specifying the number of elements from\n   * this stream from which the new stream will sample.\n   * @param seed: (Optional.) An integer specifying the random seed that\n   * will be used to create the distribution.\n   */\n  shuffle(windowSize: number, seed?: string): LazyIterator<T> {\n    return new ShuffleIterator(this, windowSize, seed);\n  }\n\n  /**\n   * Force an iterator to execute serially: each next() call will await the\n   * prior one, so that they cannot execute concurrently.\n   */\n  serial(): LazyIterator<T> {\n    return new SerialIterator(this);\n  }\n}\n\n// ============================================================================\n// The following private classes serve to implement the chainable methods\n// on LazyIterator.  Unfortunately they can't be placed in separate files,\n// due to resulting trouble with circular imports.\n// ============================================================================\n\n// Iterators that just extend LazyIterator directly\n// ============================================================================\n\nclass ArrayIterator<T> extends LazyIterator<T> {\n  private trav = 0;\n  constructor(protected items: T[]) {\n    super();\n  }\n\n  summary() {\n    return `Array of ${this.items.length} items`;\n  }\n\n  async next(): Promise<IteratorResult<T>> {\n    if (this.trav >= this.items.length) {\n      return {value: null, done: true};\n    }\n    const item = this.items[this.trav];\n    this.trav++;\n    return {value: deepClone(item), done: false};\n  }\n}\n\nclass FunctionCallIterator<T> extends LazyIterator<T> {\n  constructor(\n      protected nextFn: () => IteratorResult<T>| Promise<IteratorResult<T>>) {\n    super();\n  }\n\n  summary() {\n    return `Function call`;\n  }\n\n  async next(): Promise<IteratorResult<T>> {\n    try {\n      return this.nextFn();\n    } catch (e) {\n      // Modify the error message but leave the stack trace intact\n      e.message =\n          `Error thrown while iterating through a dataset: ${e.message}`;\n      throw e;\n    }\n  }\n}\n\nclass SerialIterator<T> extends LazyIterator<T> {\n  // Strict Promise execution order:\n  // a next() call may not even begin until the previous one completes.\n  private lastRead: Promise<IteratorResult<T>>;\n\n  constructor(protected upstream: LazyIterator<T>) {\n    super();\n    this.lastRead = Promise.resolve({value: null, done: false});\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> Serial`;\n  }\n\n  async next(): Promise<IteratorResult<T>> {\n    // This sets this.lastRead to a new Promise right away, as opposed to\n    // saying `await this.lastRead; this.lastRead = this.serialNext();` which\n    // would not work because this.nextRead would be updated only after the\n    // promise resolves.\n    this.lastRead = this.lastRead.then(() => this.serialNext());\n    return this.lastRead;\n  }\n\n  private async serialNext(): Promise<IteratorResult<T>> {\n    return this.upstream.next();\n  }\n}\n\nclass SkipIterator<T> extends LazyIterator<T> {\n  // Strict Promise execution order:\n  // a next() call may not even begin until the previous one completes.\n  private lastRead: Promise<IteratorResult<T>>;\n\n  // Local state that should not be clobbered by out-of-order execution.\n  count = 0;\n\n  constructor(protected upstream: LazyIterator<T>, protected maxCount: number) {\n    super();\n    this.lastRead = Promise.resolve({value: null, done: false});\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> Skip`;\n  }\n\n  async next(): Promise<IteratorResult<T>> {\n    // This sets this.lastRead to a new Promise right away, as opposed to\n    // saying `await this.lastRead; this.lastRead = this.serialNext();` which\n    // would not work because this.nextRead would be updated only after the\n    // promise resolves.\n    this.lastRead = this.lastRead.then(() => this.serialNext());\n    return this.lastRead;\n  }\n\n  private async serialNext(): Promise<IteratorResult<T>> {\n    // TODO(soergel): consider tradeoffs of reading in parallel, eg.\n    // collecting next() promises in an Array and then waiting for\n    // Promise.all() of those. Benefit: pseudo-parallel execution.  Drawback:\n    // maybe delayed GC.\n    while (this.count++ < this.maxCount) {\n      const skipped = await this.upstream.next();\n      // short-circuit if upstream is already empty\n      if (skipped.done) {\n        return skipped;\n      }\n      tf.dispose(skipped.value as {});\n    }\n    return this.upstream.next();\n  }\n}\n\nclass TakeIterator<T> extends LazyIterator<T> {\n  count = 0;\n  constructor(protected upstream: LazyIterator<T>, protected maxCount: number) {\n    super();\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> Take`;\n  }\n\n  async next(): Promise<IteratorResult<T>> {\n    if (this.count++ >= this.maxCount) {\n      return {value: null, done: true};\n    }\n    return this.upstream.next();\n  }\n}\n\n// Note this batch just groups items into row-wise element arrays.\n// Rotating these to a column-wise representation happens only at the dataset\n// level.\nclass RowMajorBatchIterator<T> extends LazyIterator<T[]> {\n  // Strict Promise execution order:\n  // a next() call may not even begin until the previous one completes.\n  private lastRead: Promise<IteratorResult<T[]>>;\n\n  constructor(\n      protected upstream: LazyIterator<T>, protected batchSize: number,\n      protected enableSmallLastBatch = true) {\n    super();\n    this.lastRead = Promise.resolve({value: null, done: false});\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> RowMajorBatch`;\n  }\n\n  async next(): Promise<IteratorResult<T[]>> {\n    // This sets this.lastRead to a new Promise right away, as opposed to\n    // saying `await this.lastRead; this.lastRead = this.serialNext();` which\n    // would not work because this.nextRead would be updated only after the\n    // promise resolves.\n    this.lastRead = this.lastRead.then(() => this.serialNext());\n    return this.lastRead;\n  }\n\n  private async serialNext(): Promise<IteratorResult<T[]>> {\n    const batch: T[] = [];\n    while (batch.length < this.batchSize) {\n      const item = await this.upstream.next();\n      if (item.done) {\n        if (this.enableSmallLastBatch && batch.length > 0) {\n          return {value: batch, done: false};\n        }\n        return {value: null, done: true};\n      }\n      batch.push(item.value);\n    }\n    return {value: batch, done: false};\n  }\n}\n\nclass FilterIterator<T> extends LazyIterator<T> {\n  // Strict Promise execution order:\n  // a next() call may not even begin until the previous one completes.\n  private lastRead: Promise<IteratorResult<T>>;\n\n  constructor(\n      protected upstream: LazyIterator<T>,\n      protected predicate: (value: T) => boolean) {\n    super();\n    this.lastRead = Promise.resolve({value: null, done: false});\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> Filter`;\n  }\n\n  async next(): Promise<IteratorResult<T>> {\n    // This sets this.lastRead to a new Promise right away, as opposed to\n    // saying `await this.lastRead; this.lastRead = this.serialNext();` which\n    // would not work because this.nextRead would be updated only after the\n    // promise resolves.\n    this.lastRead = this.lastRead.then(() => this.serialNext());\n    return this.lastRead;\n  }\n\n  private async serialNext(): Promise<IteratorResult<T>> {\n    while (true) {\n      const item = await this.upstream.next();\n      if (item.done || this.predicate(item.value)) {\n        return item;\n      }\n      tf.dispose(item.value as {});\n    }\n  }\n}\n\nclass MapIterator<I, O> extends LazyIterator<O> {\n  constructor(\n      protected upstream: LazyIterator<I>,\n      protected transform: (value: I) => O) {\n    super();\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> Map`;\n  }\n\n  async next(): Promise<IteratorResult<O>> {\n    const item = await this.upstream.next();\n    if (item.done) {\n      return {value: null, done: true};\n    }\n    const inputTensors = tf.tensor_util.getTensorsInContainer(item.value as {});\n    // Careful: the transform may mutate the item in place.\n    // That's why we have to remember the input Tensors above, and then\n    // below dispose only those that were not passed through to the output.\n    // Note too that the transform function is responsible for tidying\n    // any intermediate Tensors.  Here we are concerned only about the\n    // inputs.\n    const mapped = this.transform(item.value);\n    const outputTensors = tf.tensor_util.getTensorsInContainer(mapped as {});\n\n    // TODO(soergel) faster intersection\n    // TODO(soergel) move to tf.disposeExcept(in, out)?\n    for (const t of inputTensors) {\n      if (!tf.tensor_util.isTensorInList(t, outputTensors)) {\n        t.dispose();\n      }\n    }\n    return {value: mapped, done: false};\n  }\n}\n\nclass ErrorHandlingLazyIterator<T> extends LazyIterator<T> {\n  count = 0;\n  constructor(\n      protected upstream: LazyIterator<T>,\n      protected handler: (error: Error) => boolean) {\n    super();\n    this.lastRead = Promise.resolve({value: null, done: false});\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> handleErrors`;\n  }\n\n  // Strict Promise execution order:\n  // a next() call may not even begin until the previous one completes.\n  private lastRead: Promise<IteratorResult<T>>;\n\n  async next(): Promise<IteratorResult<T>> {\n    // This sets this.lastRead to a new Promise right away, as opposed to\n    // saying `await this.lastRead; this.lastRead = this.serialNext();` which\n    // would not work because this.nextRead would be updated only after the\n    // promise resolves.\n    this.lastRead = this.lastRead.then(() => this.serialNext());\n    return this.lastRead;\n  }\n\n  async serialNext(): Promise<IteratorResult<T>> {\n    while (true) {\n      try {\n        return await this.upstream.next();\n      } catch (e) {\n        if (!this.handler(e)) {\n          return {value: null, done: true};\n        }\n        // If the handler returns true, loop and fetch the next upstream item.\n\n        // If the upstream iterator throws an endless stream of errors, and if\n        // the handler says to ignore them, then we loop forever here.  That is\n        // the correct behavior-- it's up to the handler to decide when to stop.\n      }\n    }\n  }\n}\n\nclass AsyncMapIterator<I, O> extends LazyIterator<O> {\n  constructor(\n      protected upstream: LazyIterator<I>,\n      protected transform: (value: I) => Promise<O>) {\n    super();\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> AsyncMap`;\n  }\n\n  async next(): Promise<IteratorResult<O>> {\n    const item = await this.upstream.next();\n    if (item.done) {\n      return {value: null, done: true};\n    }\n    const inputTensors = tf.tensor_util.getTensorsInContainer(item.value as {});\n    // Careful: the transform may mutate the item in place.\n    // That's why we have to remember the input Tensors above, and then\n    // below dispose only those that were not passed through to the output.\n    // Note too that the transform function is responsible for tidying\n    // any intermediate Tensors.  Here we are concerned only about the\n    // inputs.\n    const mapped = await this.transform(item.value);\n    const outputTensors = tf.tensor_util.getTensorsInContainer(mapped as {});\n\n    // TODO(soergel) faster intersection\n    // TODO(soergel) move to tf.disposeExcept(in, out)?\n    for (const t of inputTensors) {\n      if (!tf.tensor_util.isTensorInList(t, outputTensors)) {\n        t.dispose();\n      }\n    }\n    return {value: mapped, done: false};\n  }\n}\n\n// Iterators that maintain a queue of pending items\n// ============================================================================\n\n/**\n * A base class for transforming streams that operate by maintaining an\n * output queue of elements that are ready to return via next().  This is\n * commonly required when the transformation is 1-to-many:  A call to next()\n * may trigger a call to the underlying stream, which will produce many\n * mapped elements of this stream-- of which we need to return only one, so\n * we have to queue the rest.\n */\nexport abstract class OneToManyIterator<T> extends LazyIterator<T> {\n  // Strict Promise execution order:\n  // a next() call may not even begin until the previous one completes.\n  private lastRead: Promise<IteratorResult<T>>;\n\n  // Local state that should not be clobbered by out-of-order execution.\n  protected outputQueue: RingBuffer<T>;\n\n  constructor() {\n    super();\n    this.outputQueue = new GrowingRingBuffer<T>();\n    this.lastRead = Promise.resolve({value: null, done: false});\n  }\n\n  async next(): Promise<IteratorResult<T>> {\n    // This sets this.lastRead to a new Promise right away, as opposed to\n    // saying `await this.lastRead; this.lastRead = this.serialNext();` which\n    // would not work because this.nextRead would be updated only after the\n    // promise resolves.\n    this.lastRead = this.lastRead.then(() => this.serialNext());\n    return this.lastRead;\n  }\n\n  /**\n   * Read one or more chunks from upstream and process them, possibly\n   * reading or writing a carryover, and adding processed items to the\n   * output queue.  Note it's possible that no items are added to the queue\n   * on a given pump() call, even if the upstream stream is not closed\n   * (e.g., because items are filtered).\n   *\n   * @return `true` if any action was taken, i.e. fetching items from the\n   *   upstream source OR adding items to the output queue.  `false` if the\n   *   upstream source is exhausted AND nothing was added to the queue\n   * (i.e., any remaining carryover).\n   */\n  protected abstract pump(): Promise<boolean>;\n\n  async serialNext(): Promise<IteratorResult<T>> {\n    // Fetch so that the queue contains at least one item if possible.\n    // If the upstream source is exhausted, AND there are no items left in\n    // the output queue, then this stream is also exhausted.\n    while (this.outputQueue.length() === 0) {\n      // TODO(soergel): consider parallel reads.\n      if (!await this.pump()) {\n        return {value: null, done: true};\n      }\n    }\n    return {value: this.outputQueue.shift(), done: false};\n  }\n}\nclass FlatmapIterator<I, O> extends OneToManyIterator<O> {\n  constructor(\n      protected upstream: LazyIterator<I>,\n      protected transform: (value: I) => O[]) {\n    super();\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> Flatmap`;\n  }\n\n  async pump(): Promise<boolean> {\n    const item = await this.upstream.next();\n    if (item.done) {\n      return false;\n    }\n    const inputTensors = tf.tensor_util.getTensorsInContainer(item.value as {});\n    // Careful: the transform may mutate the item in place.\n    // that's why we have to remember the input Tensors above, and then\n    // below dispose only those that were not passed through to the output.\n    // Note too that the transform function is responsible for tidying any\n    // intermediate Tensors.  Here we are concerned only about the inputs.\n    const mappedArray = this.transform(item.value);\n    const outputTensors =\n        tf.tensor_util.getTensorsInContainer(mappedArray as {});\n    this.outputQueue.pushAll(mappedArray);\n\n    // TODO(soergel) faster intersection, and deduplicate outputTensors\n    // TODO(soergel) move to tf.disposeExcept(in, out)?\n    for (const t of inputTensors) {\n      if (!tf.tensor_util.isTensorInList(t, outputTensors)) {\n        t.dispose();\n      }\n    }\n\n    return true;\n  }\n}\n\n/**\n * Provides a `LazyIterator` that concatenates a stream of underlying\n * streams.\n *\n * Doing this in a concurrency-safe way requires some trickery.  In\n * particular, we want this stream to return the elements from the\n * underlying streams in the correct order according to when next() was\n * called, even if the resulting Promises resolve in a different order.\n */\nexport class ChainedIterator<T> extends LazyIterator<T> {\n  // Strict Promise execution order:\n  // a next() call may not even begin until the previous one completes.\n  private lastRead: Promise<IteratorResult<T>> = null;\n\n  // Local state that should not be clobbered by out-of-order execution.\n  private iterator: LazyIterator<T> = null;\n  private moreIterators: LazyIterator<LazyIterator<T>>;\n\n  constructor(\n      iterators: LazyIterator<LazyIterator<T>>,\n      private readonly baseErrorHandler?: (e: Error) => boolean) {\n    super();\n    this.moreIterators = iterators;\n  }\n\n  summary() {\n    const upstreamSummaries = 'TODO: fill in upstream of chained summaries';\n    return `${upstreamSummaries} -> Chained`;\n  }\n\n  async next(): Promise<IteratorResult<T>> {\n    this.lastRead = this.readFromChain(this.lastRead);\n    return this.lastRead;\n  }\n\n  private async readFromChain(lastRead: Promise<IteratorResult<T>>):\n      Promise<IteratorResult<T>> {\n    // Must await on the previous read since the previous read may have advanced\n    // the stream of streams, from which we need to read.\n    // This is unfortunate since we can't parallelize reads. Which means\n    // prefetching of chained streams is a no-op.\n    // One solution is to prefetch immediately upstream of this.\n    await lastRead;\n    if (this.iterator == null) {\n      const iteratorResult = await this.moreIterators.next();\n      if (iteratorResult.done) {\n        // No more streams to stream from.\n        return {value: null, done: true};\n      }\n      this.iterator = iteratorResult.value;\n      if (this.baseErrorHandler != null) {\n        this.iterator = this.iterator.handleErrors(this.baseErrorHandler);\n      }\n    }\n    const itemResult = await this.iterator.next();\n    if (itemResult.done) {\n      this.iterator = null;\n      return this.readFromChain(lastRead);\n    }\n    return itemResult;\n  }\n}\n\nexport enum ZipMismatchMode {\n  FAIL,      // require zipped streams to have the same length\n  SHORTEST,  // terminate zip when the first stream is exhausted\n  LONGEST    // use nulls for exhausted streams; use up the longest stream.\n}\n\n/**\n * Provides a `LazyIterator` that zips together an array, dict, or nested\n * structure of `LazyIterator`s (and perhaps additional constants).\n *\n * The underlying streams must provide elements in a consistent order such\n * that they correspond.\n *\n * Typically, the underlying streams should have the same number of\n * elements. If they do not, the behavior is determined by the\n * `mismatchMode` argument.\n *\n * The nested structure of the `iterators` argument determines the\n * structure of elements in the resulting iterator.\n *\n * Doing this in a concurrency-safe way requires some trickery.  In\n * particular, we want this stream to return the elements from the\n * underlying streams in the correct order according to when next() was\n * called, even if the resulting Promises resolve in a different order.\n *\n * @param iterators: An array or object containing LazyIterators at the\n * leaves.\n * @param mismatchMode: Determines what to do when one underlying iterator\n * is exhausted before the others.  `ZipMismatchMode.FAIL` (the default)\n * causes an error to be thrown in this case.  `ZipMismatchMode.SHORTEST`\n * causes the zipped iterator to terminate with the furst underlying\n * streams, so elements remaining on the longer streams are ignored.\n * `ZipMismatchMode.LONGEST` causes the zipped stream to continue, filling\n * in nulls for the exhausted streams, until all streams are exhausted.\n */\nclass ZipIterator<O extends tf.TensorContainer> extends LazyIterator<O> {\n  private count = 0;\n  private currentPromise: Promise<IteratorResult<O>> = null;\n\n  constructor(\n      protected readonly iterators: IteratorContainer,\n      protected readonly mismatchMode: ZipMismatchMode = ZipMismatchMode.FAIL) {\n    super();\n  }\n\n  summary() {\n    const upstreamSummaries = 'TODO: fill in upstream of zip summaries';\n    return `{${upstreamSummaries}} -> Zip`;\n  }\n\n  private async nextState(afterState: Promise<IteratorResult<O>>):\n      Promise<IteratorResult<O>> {\n    // This chaining ensures that the underlying next() are not even called\n    // before the previous ones have resolved.\n    await afterState;\n\n    // Collect underlying iterator \"done\" signals as a side effect in\n    // getNext()\n    let numIterators = 0;\n    let iteratorsDone = 0;\n\n    function getNext(container: IteratorContainer): DeepMapAsyncResult {\n      if (container instanceof LazyIterator) {\n        const result = container.next();\n        return {\n          value: result.then(x => {\n            numIterators++;\n            if (x.done) {\n              iteratorsDone++;\n            }\n            return x.value;\n          }),\n          recurse: false\n        };\n      } else {\n        return {value: null, recurse: true};\n      }\n    }\n\n    const mapped: O = await deepMapAndAwaitAll(this.iterators, getNext);\n\n    if (numIterators === iteratorsDone) {\n      // The streams have all ended.\n      return {value: null, done: true};\n    }\n    if (iteratorsDone > 0) {\n      switch (this.mismatchMode) {\n        case ZipMismatchMode.FAIL:\n          throw new Error(\n              'Zipped streams should have the same length. ' +\n              `Mismatched at element ${this.count}.`);\n        case ZipMismatchMode.SHORTEST:\n          return {value: null, done: true};\n        case ZipMismatchMode.LONGEST:\n        default:\n          // Continue.  The exhausted streams already produced value: null.\n      }\n    }\n\n    this.count++;\n    return {value: mapped, done: false};\n  }\n\n  async next(): Promise<IteratorResult<O>> {\n    this.currentPromise = this.nextState(this.currentPromise);\n    return this.currentPromise;\n  }\n}\n\n// Iterators that maintain a ring buffer of pending promises\n// ============================================================================\n\n/**\n * A stream that prefetches a given number of items from an upstream source,\n * returning them in FIFO order.\n *\n * Note this prefetches Promises, but makes no guarantees about when those\n * Promises resolve.\n */\nexport class PrefetchIterator<T> extends LazyIterator<T> {\n  protected buffer: RingBuffer<Promise<IteratorResult<T>>>;\n\n  constructor(\n      protected upstream: LazyIterator<T>, protected bufferSize: number) {\n    super();\n    this.buffer = new RingBuffer<Promise<IteratorResult<T>>>(bufferSize);\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> Prefetch`;\n  }\n\n  /**\n   * Refill the prefetch buffer.  Returns only after the buffer is full, or\n   * the upstream source is exhausted.\n   */\n  protected refill() {\n    while (!this.buffer.isFull()) {\n      const v = this.upstream.next();\n      this.buffer.push(v);\n    }\n  }\n\n  next(): Promise<IteratorResult<T>> {\n    this.refill();\n    // This shift will never throw an error because the buffer is always\n    // full after a refill. If the stream is exhausted, the buffer will be\n    // full of Promises that will resolve to the end-of-stream signal.\n    return this.buffer.shift();\n  }\n}\n\n/**\n * A stream that performs a sliding-window random shuffle on an upstream\n * source. This is like a `PrefetchIterator` except that the items are\n * returned in randomized order.  Mixing naturally improves as the buffer\n * size increases.\n */\nexport class ShuffleIterator<T> extends PrefetchIterator<T> {\n  private readonly random: seedrandom.prng;\n\n  // Strict Promise execution order:\n  // a next() call may not even begin until the previous one completes.\n  private lastRead: Promise<IteratorResult<T>>;\n\n  // Local state that should not be clobbered by out-of-order execution.\n  private upstreamExhausted = false;\n\n  constructor(\n    protected override upstream: LazyIterator<T>, protected windowSize: number,\n      seed?: string) {\n    super(upstream, windowSize);\n    this.random = seedrandom.alea(seed || tf.util.now().toString());\n    this.lastRead = Promise.resolve({value: null, done: false});\n  }\n\n  override async next(): Promise<IteratorResult<T>> {\n    // This sets this.lastRead to a new Promise right away, as opposed to\n    // saying `await this.lastRead; this.lastRead = this.serialNext();` which\n    // would not work because this.nextRead would be updated only after the\n    // promise resolves.\n    this.lastRead = this.lastRead.then(() => this.serialNext());\n    return this.lastRead;\n  }\n\n  private randomInt(max: number) {\n    return Math.floor(this.random() * max);\n  }\n\n  protected chooseIndex(): number {\n    return this.randomInt(this.buffer.length());\n  }\n\n  async serialNext(): Promise<IteratorResult<T>> {\n    // TODO(soergel): consider performance\n    if (!this.upstreamExhausted) {\n      this.refill();\n    }\n    while (!this.buffer.isEmpty()) {\n      const chosenIndex = this.chooseIndex();\n      const result = await this.buffer.shuffleExcise(chosenIndex);\n      if (result.done) {\n        this.upstreamExhausted = true;\n      } else {\n        this.refill();\n        return result;\n      }\n    }\n    return {value: null, done: true};\n  }\n}\n"]}
|