/** * @license * Copyright 2018 Google LLC. All Rights Reserved. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * ============================================================================= */ import * as tf from '@tensorflow/tfjs-core'; import * as seedrandom from 'seedrandom'; import { deepClone } from '../util/deep_clone'; import { deepMapAndAwaitAll, deepZip, zipToList } from '../util/deep_map'; import { GrowingRingBuffer } from '../util/growing_ring_buffer'; import { RingBuffer } from '../util/ring_buffer'; // Here we implement a simple asynchronous iterator. // This lets us avoid using either third-party stream libraries or // recent TypeScript language support requiring polyfills. /** * Create a `LazyIterator` from an array of items. */ export function iteratorFromItems(items) { return new ArrayIterator(items); } /** * Create a `LazyIterator` of incrementing integers. */ export function iteratorFromIncrementing(start) { let i = start; return iteratorFromFunction(() => ({ value: i++, done: false })); } /** * Create a `LazyIterator` from a function. * * ```js * let i = -1; * const func = () => * ++i < 5 ? {value: i, done: false} : {value: null, done: true}; * const iter = tf.data.iteratorFromFunction(func); * await iter.forEachAsync(e => console.log(e)); * ``` * * @param func A function that produces data on each call. */ export function iteratorFromFunction(func) { return new FunctionCallIterator(func); } /** * Create a `LazyIterator` by concatenating underlying streams, which are * themselves provided as a stream. * * This can also be thought of as a "stream flatten" operation. * * @param baseIterators A stream of streams to be concatenated. * @param baseErrorHandler An optional function that can intercept `Error`s * raised during a `next()` call on the base stream. This function can decide * whether the error should be propagated, whether the error should be * ignored, or whether the base stream should be terminated. */ export function iteratorFromConcatenated(baseIterators, baseErrorHandler) { return new ChainedIterator(baseIterators, baseErrorHandler); } /** * Create a `LazyIterator` by concatenating streams produced by calling a * stream-generating function a given number of times. * * Since a `LazyIterator` is read-once, it cannot be repeated, but this * function can be used to achieve a similar effect: * * LazyIterator.ofConcatenatedFunction(() => new MyIterator(), 6); * * @param iteratorFunc: A function that produces a new stream on each call. * @param count: The number of times to call the function. * @param baseErrorHandler An optional function that can intercept `Error`s * raised during a `next()` call on the base stream. This function can decide * whether the error should be propagated, whether the error should be * ignored, or whether the base stream should be terminated. */ export function iteratorFromConcatenatedFunction(iteratorFunc, count, baseErrorHandler) { return iteratorFromConcatenated(iteratorFromFunction(iteratorFunc).take(count), baseErrorHandler); } /** * Create a `LazyIterator` by zipping together an array, dict, or nested * structure of `LazyIterator`s (and perhaps additional constants). * * The underlying streams must provide elements in a consistent order such * that they correspond. * * Typically, the underlying streams should have the same number of * elements. If they do not, the behavior is determined by the * `mismatchMode` argument. * * The nested structure of the `iterators` argument determines the * structure of elements in the resulting iterator. * * @param iterators: An array or object containing LazyIterators at the * leaves. * @param mismatchMode: Determines what to do when one underlying iterator * is exhausted before the others. `ZipMismatchMode.FAIL` (the default) * causes an error to be thrown in this case. `ZipMismatchMode.SHORTEST` * causes the zipped iterator to terminate with the furst underlying * streams, so elements remaining on the longer streams are ignored. * `ZipMismatchMode.LONGEST` causes the zipped stream to continue, filling * in nulls for the exhausted streams, until all streams are exhausted. */ export function iteratorFromZipped(iterators, mismatchMode = ZipMismatchMode.FAIL) { return new ZipIterator(iterators, mismatchMode); } /** * An asynchronous iterator, providing lazy access to a potentially * unbounded stream of elements. * * Iterator can be obtained from a dataset: * `const iter = await dataset.iterator();` */ export class LazyIterator { /** * Collect all remaining elements of a bounded stream into an array. * Obviously this will succeed only for small streams that fit in memory. * Useful for testing. * * @returns A Promise for an array of stream elements, which will resolve * when the stream is exhausted. */ async toArray() { const result = []; let x = await this.next(); while (!x.done) { result.push(x.value); x = await this.next(); } return result; } /** * Collect all elements of this dataset into an array with prefetching 100 * elements. This is useful for testing, because the prefetch changes the * order in which the Promises are resolved along the processing pipeline. * This may help expose bugs where results are dependent on the order of * Promise resolution rather than on the logical order of the stream (i.e., * due to hidden mutable state). * * @returns A Promise for an array of stream elements, which will resolve * when the stream is exhausted. */ async toArrayForTest() { const stream = this.prefetch(100); const result = []; let x = await stream.next(); while (!x.done) { result.push(x.value); x = await stream.next(); } return result; } /** * Draw items from the stream until it is exhausted. * * This can be useful when the stream has side effects but no output. In * that case, calling this function guarantees that the stream will be * fully processed. */ async resolveFully() { let x = await this.next(); while (!x.done) { x = await this.next(); } } /** * Draw items from the stream until it is exhausted, or a predicate fails. * * This can be useful when the stream has side effects but no output. In * that case, calling this function guarantees that the stream will be * fully processed. */ async resolveWhile(predicate) { let x = await this.next(); let shouldContinue = predicate(x.value); while ((!x.done) && shouldContinue) { x = await this.next(); shouldContinue = predicate(x.value); } } /** * Handles errors thrown on this stream using a provided handler function. * * @param handler A function that handles any `Error` thrown during a `next()` * call and returns true if the stream should continue (dropping the failed * call) or false if the stream should quietly terminate. If the handler * itself throws (or rethrows) an `Error`, that will be propagated. * * @returns A `LazyIterator` of elements passed through from upstream, * possibly filtering or terminating on upstream `next()` calls that * throw an `Error`. */ handleErrors(handler) { return new ErrorHandlingLazyIterator(this, handler); } // TODO(soergel): Implement reduce() etc. /** * Filters this stream according to `predicate`. * * @param predicate A function mapping a stream element to a boolean or a * `Promise` for one. * * @returns A `LazyIterator` of elements for which the predicate was true. */ filter(predicate) { return new FilterIterator(this, predicate); } /** * Maps this stream through a 1-to-1 transform. * * @param transform A function mapping a stream element to a transformed * element. * * @returns A `LazyIterator` of transformed elements. */ map(transform) { return new MapIterator(this, transform); } /** * Maps this stream through an async 1-to-1 transform. * * @param transform A function mapping a stream element to a `Promise` for a * transformed stream element. * * @returns A `LazyIterator` of transformed elements. */ mapAsync(transform) { return new AsyncMapIterator(this, transform); } /** * Maps this stream through a 1-to-1 transform, forcing serial execution. * * @param transform A function mapping a stream element to a transformed * element. * * @returns A `LazyIterator` of transformed elements. */ serialMapAsync(transform) { return new AsyncMapIterator(this, transform).serial(); } /** * Maps this stream through a 1-to-many transform. * * @param transform A function mapping a stream element to an array of * transformed elements. * * @returns A `DataStream` of transformed elements. */ flatmap(transform) { return new FlatmapIterator(this, transform); } /** * Apply a function to every element of the stream. * * @param f A function to apply to each stream element. */ async forEachAsync(f) { return this.map(f).resolveFully(); } /** * Apply a function to every element of the stream, forcing serial execution. * * @param f A function to apply to each stream element. Should return 'true' * to indicate that the stream should continue, or 'false' to cause it to * terminate. */ async serialForEach(f) { return this.serialMapAsync(f).resolveWhile(x => (x === true)); } /** * Groups elements into batches, represented as arrays of elements. * * We can think of the elements of this iterator as 'rows' (even if they are * nested structures). By the same token, consecutive values for a given * key within the elements form a 'column'. This matches the usual sense of * 'row' and 'column' when processing tabular data (e.g., parsing a CSV). * * Thus, "Row-major" means that the resulting batch is simply a collection of * rows: `[row1, row2, row3, ...]`. This is contrast to the column-major * form, which is needed for vectorized computation. * * @param batchSize The number of elements desired per batch. * @param smallLastBatch Whether to emit the final batch when it has fewer * than batchSize elements. Default true. * @returns A `LazyIterator` of batches of elements, represented as arrays * of the original element type. */ rowMajorBatch(batchSize, smallLastBatch = true) { return new RowMajorBatchIterator(this, batchSize, smallLastBatch); } /** * Groups elements into batches, represented in column-major form. * * We can think of the elements of this iterator as 'rows' (even if they are * nested structures). By the same token, consecutive values for a given * key within the elements form a 'column'. This matches the usual sense of * 'row' and 'column' when processing tabular data (e.g., parsing a CSV). * * Thus, "column-major" means that the resulting batch is a (potentially * nested) structure representing the columns. Each column entry, then, * contains a collection of the values found in that column for a range of * input elements. This representation allows for vectorized computation, in * contrast to the row-major form. * * The inputs should all have the same nested structure (i.e., of arrays and * dicts). The result is a single object with the same nested structure, * where the leaves are arrays collecting the values of the inputs at that * location (or, optionally, the result of a custom function applied to those * arrays). * * @param batchSize The number of elements desired per batch. * @param smallLastBatch Whether to emit the final batch when it has fewer * than batchSize elements. Default true. * @param zipFn: (optional) A function that expects an array of elements at a * single node of the object tree, and returns a `DeepMapResult`. The * `DeepMapResult` either provides a result value for that node (i.e., * representing the subtree), or indicates that the node should be processed * recursively. The default zipFn recurses as far as possible and places * arrays at the leaves. * @returns A `LazyIterator` of batches of elements, represented as an object * with collections at the leaves. */ columnMajorBatch(batchSize, smallLastBatch = true, // tslint:disable-next-line:no-any zipFn = zipToList) { // First collect the desired number of input elements as a row-major batch. const rowBatches = this.rowMajorBatch(batchSize, smallLastBatch); // Now 'rotate' or 'pivot' the data, collecting all values from each column // in the batch (i.e., for each key within the elements) into an array. return rowBatches.map(x => deepZip(x, zipFn)); } /** * Concatenate this `LazyIterator` with another. * * @param iterator A `LazyIterator` to be concatenated onto this one. * @param baseErrorHandler An optional function that can intercept `Error`s * raised during a `next()` call on the base stream. This function can * decide whether the error should be propagated, whether the error should * be ignored, or whether the base stream should be terminated. * @returns A `LazyIterator`. */ concatenate(iterator, baseErrorHandler) { return new ChainedIterator(iteratorFromItems([this, iterator]), baseErrorHandler); } /** * Limits this stream to return at most `count` items. * * @param count The maximum number of items to provide from the stream. If * a negative or undefined value is given, the entire stream is returned * unaltered. */ take(count) { if (count < 0 || count == null) { return this; } return new TakeIterator(this, count); } /** * Skips the first `count` items in this stream. * * @param count The number of items to skip. If a negative or undefined * value is given, the entire stream is returned unaltered. */ skip(count) { if (count < 0 || count == null) { return this; } return new SkipIterator(this, count); } /** * Prefetch the first `bufferSize` items in this stream. * * Note this prefetches Promises, but makes no guarantees about when those * Promises resolve. * * @param bufferSize: An integer specifying the number of elements to be * prefetched. */ prefetch(bufferSize) { return new PrefetchIterator(this, bufferSize); } // TODO(soergel): deep sharded shuffle, where supported /** * Randomly shuffles the elements of this stream. * * @param bufferSize: An integer specifying the number of elements from * this stream from which the new stream will sample. * @param seed: (Optional.) An integer specifying the random seed that * will be used to create the distribution. */ shuffle(windowSize, seed) { return new ShuffleIterator(this, windowSize, seed); } /** * Force an iterator to execute serially: each next() call will await the * prior one, so that they cannot execute concurrently. */ serial() { return new SerialIterator(this); } } // ============================================================================ // The following private classes serve to implement the chainable methods // on LazyIterator. Unfortunately they can't be placed in separate files, // due to resulting trouble with circular imports. // ============================================================================ // Iterators that just extend LazyIterator directly // ============================================================================ class ArrayIterator extends LazyIterator { constructor(items) { super(); this.items = items; this.trav = 0; } summary() { return `Array of ${this.items.length} items`; } async next() { if (this.trav >= this.items.length) { return { value: null, done: true }; } const item = this.items[this.trav]; this.trav++; return { value: deepClone(item), done: false }; } } class FunctionCallIterator extends LazyIterator { constructor(nextFn) { super(); this.nextFn = nextFn; } summary() { return `Function call`; } async next() { try { return this.nextFn(); } catch (e) { // Modify the error message but leave the stack trace intact e.message = `Error thrown while iterating through a dataset: ${e.message}`; throw e; } } } class SerialIterator extends LazyIterator { constructor(upstream) { super(); this.upstream = upstream; this.lastRead = Promise.resolve({ value: null, done: false }); } summary() { return `${this.upstream.summary()} -> Serial`; } async next() { // This sets this.lastRead to a new Promise right away, as opposed to // saying `await this.lastRead; this.lastRead = this.serialNext();` which // would not work because this.nextRead would be updated only after the // promise resolves. this.lastRead = this.lastRead.then(() => this.serialNext()); return this.lastRead; } async serialNext() { return this.upstream.next(); } } class SkipIterator extends LazyIterator { constructor(upstream, maxCount) { super(); this.upstream = upstream; this.maxCount = maxCount; // Local state that should not be clobbered by out-of-order execution. this.count = 0; this.lastRead = Promise.resolve({ value: null, done: false }); } summary() { return `${this.upstream.summary()} -> Skip`; } async next() { // This sets this.lastRead to a new Promise right away, as opposed to // saying `await this.lastRead; this.lastRead = this.serialNext();` which // would not work because this.nextRead would be updated only after the // promise resolves. this.lastRead = this.lastRead.then(() => this.serialNext()); return this.lastRead; } async serialNext() { // TODO(soergel): consider tradeoffs of reading in parallel, eg. // collecting next() promises in an Array and then waiting for // Promise.all() of those. Benefit: pseudo-parallel execution. Drawback: // maybe delayed GC. while (this.count++ < this.maxCount) { const skipped = await this.upstream.next(); // short-circuit if upstream is already empty if (skipped.done) { return skipped; } tf.dispose(skipped.value); } return this.upstream.next(); } } class TakeIterator extends LazyIterator { constructor(upstream, maxCount) { super(); this.upstream = upstream; this.maxCount = maxCount; this.count = 0; } summary() { return `${this.upstream.summary()} -> Take`; } async next() { if (this.count++ >= this.maxCount) { return { value: null, done: true }; } return this.upstream.next(); } } // Note this batch just groups items into row-wise element arrays. // Rotating these to a column-wise representation happens only at the dataset // level. class RowMajorBatchIterator extends LazyIterator { constructor(upstream, batchSize, enableSmallLastBatch = true) { super(); this.upstream = upstream; this.batchSize = batchSize; this.enableSmallLastBatch = enableSmallLastBatch; this.lastRead = Promise.resolve({ value: null, done: false }); } summary() { return `${this.upstream.summary()} -> RowMajorBatch`; } async next() { // This sets this.lastRead to a new Promise right away, as opposed to // saying `await this.lastRead; this.lastRead = this.serialNext();` which // would not work because this.nextRead would be updated only after the // promise resolves. this.lastRead = this.lastRead.then(() => this.serialNext()); return this.lastRead; } async serialNext() { const batch = []; while (batch.length < this.batchSize) { const item = await this.upstream.next(); if (item.done) { if (this.enableSmallLastBatch && batch.length > 0) { return { value: batch, done: false }; } return { value: null, done: true }; } batch.push(item.value); } return { value: batch, done: false }; } } class FilterIterator extends LazyIterator { constructor(upstream, predicate) { super(); this.upstream = upstream; this.predicate = predicate; this.lastRead = Promise.resolve({ value: null, done: false }); } summary() { return `${this.upstream.summary()} -> Filter`; } async next() { // This sets this.lastRead to a new Promise right away, as opposed to // saying `await this.lastRead; this.lastRead = this.serialNext();` which // would not work because this.nextRead would be updated only after the // promise resolves. this.lastRead = this.lastRead.then(() => this.serialNext()); return this.lastRead; } async serialNext() { while (true) { const item = await this.upstream.next(); if (item.done || this.predicate(item.value)) { return item; } tf.dispose(item.value); } } } class MapIterator extends LazyIterator { constructor(upstream, transform) { super(); this.upstream = upstream; this.transform = transform; } summary() { return `${this.upstream.summary()} -> Map`; } async next() { const item = await this.upstream.next(); if (item.done) { return { value: null, done: true }; } const inputTensors = tf.tensor_util.getTensorsInContainer(item.value); // Careful: the transform may mutate the item in place. // That's why we have to remember the input Tensors above, and then // below dispose only those that were not passed through to the output. // Note too that the transform function is responsible for tidying // any intermediate Tensors. Here we are concerned only about the // inputs. const mapped = this.transform(item.value); const outputTensors = tf.tensor_util.getTensorsInContainer(mapped); // TODO(soergel) faster intersection // TODO(soergel) move to tf.disposeExcept(in, out)? for (const t of inputTensors) { if (!tf.tensor_util.isTensorInList(t, outputTensors)) { t.dispose(); } } return { value: mapped, done: false }; } } class ErrorHandlingLazyIterator extends LazyIterator { constructor(upstream, handler) { super(); this.upstream = upstream; this.handler = handler; this.count = 0; this.lastRead = Promise.resolve({ value: null, done: false }); } summary() { return `${this.upstream.summary()} -> handleErrors`; } async next() { // This sets this.lastRead to a new Promise right away, as opposed to // saying `await this.lastRead; this.lastRead = this.serialNext();` which // would not work because this.nextRead would be updated only after the // promise resolves. this.lastRead = this.lastRead.then(() => this.serialNext()); return this.lastRead; } async serialNext() { while (true) { try { return await this.upstream.next(); } catch (e) { if (!this.handler(e)) { return { value: null, done: true }; } // If the handler returns true, loop and fetch the next upstream item. // If the upstream iterator throws an endless stream of errors, and if // the handler says to ignore them, then we loop forever here. That is // the correct behavior-- it's up to the handler to decide when to stop. } } } } class AsyncMapIterator extends LazyIterator { constructor(upstream, transform) { super(); this.upstream = upstream; this.transform = transform; } summary() { return `${this.upstream.summary()} -> AsyncMap`; } async next() { const item = await this.upstream.next(); if (item.done) { return { value: null, done: true }; } const inputTensors = tf.tensor_util.getTensorsInContainer(item.value); // Careful: the transform may mutate the item in place. // That's why we have to remember the input Tensors above, and then // below dispose only those that were not passed through to the output. // Note too that the transform function is responsible for tidying // any intermediate Tensors. Here we are concerned only about the // inputs. const mapped = await this.transform(item.value); const outputTensors = tf.tensor_util.getTensorsInContainer(mapped); // TODO(soergel) faster intersection // TODO(soergel) move to tf.disposeExcept(in, out)? for (const t of inputTensors) { if (!tf.tensor_util.isTensorInList(t, outputTensors)) { t.dispose(); } } return { value: mapped, done: false }; } } // Iterators that maintain a queue of pending items // ============================================================================ /** * A base class for transforming streams that operate by maintaining an * output queue of elements that are ready to return via next(). This is * commonly required when the transformation is 1-to-many: A call to next() * may trigger a call to the underlying stream, which will produce many * mapped elements of this stream-- of which we need to return only one, so * we have to queue the rest. */ export class OneToManyIterator extends LazyIterator { constructor() { super(); this.outputQueue = new GrowingRingBuffer(); this.lastRead = Promise.resolve({ value: null, done: false }); } async next() { // This sets this.lastRead to a new Promise right away, as opposed to // saying `await this.lastRead; this.lastRead = this.serialNext();` which // would not work because this.nextRead would be updated only after the // promise resolves. this.lastRead = this.lastRead.then(() => this.serialNext()); return this.lastRead; } async serialNext() { // Fetch so that the queue contains at least one item if possible. // If the upstream source is exhausted, AND there are no items left in // the output queue, then this stream is also exhausted. while (this.outputQueue.length() === 0) { // TODO(soergel): consider parallel reads. if (!await this.pump()) { return { value: null, done: true }; } } return { value: this.outputQueue.shift(), done: false }; } } class FlatmapIterator extends OneToManyIterator { constructor(upstream, transform) { super(); this.upstream = upstream; this.transform = transform; } summary() { return `${this.upstream.summary()} -> Flatmap`; } async pump() { const item = await this.upstream.next(); if (item.done) { return false; } const inputTensors = tf.tensor_util.getTensorsInContainer(item.value); // Careful: the transform may mutate the item in place. // that's why we have to remember the input Tensors above, and then // below dispose only those that were not passed through to the output. // Note too that the transform function is responsible for tidying any // intermediate Tensors. Here we are concerned only about the inputs. const mappedArray = this.transform(item.value); const outputTensors = tf.tensor_util.getTensorsInContainer(mappedArray); this.outputQueue.pushAll(mappedArray); // TODO(soergel) faster intersection, and deduplicate outputTensors // TODO(soergel) move to tf.disposeExcept(in, out)? for (const t of inputTensors) { if (!tf.tensor_util.isTensorInList(t, outputTensors)) { t.dispose(); } } return true; } } /** * Provides a `LazyIterator` that concatenates a stream of underlying * streams. * * Doing this in a concurrency-safe way requires some trickery. In * particular, we want this stream to return the elements from the * underlying streams in the correct order according to when next() was * called, even if the resulting Promises resolve in a different order. */ export class ChainedIterator extends LazyIterator { constructor(iterators, baseErrorHandler) { super(); this.baseErrorHandler = baseErrorHandler; // Strict Promise execution order: // a next() call may not even begin until the previous one completes. this.lastRead = null; // Local state that should not be clobbered by out-of-order execution. this.iterator = null; this.moreIterators = iterators; } summary() { const upstreamSummaries = 'TODO: fill in upstream of chained summaries'; return `${upstreamSummaries} -> Chained`; } async next() { this.lastRead = this.readFromChain(this.lastRead); return this.lastRead; } async readFromChain(lastRead) { // Must await on the previous read since the previous read may have advanced // the stream of streams, from which we need to read. // This is unfortunate since we can't parallelize reads. Which means // prefetching of chained streams is a no-op. // One solution is to prefetch immediately upstream of this. await lastRead; if (this.iterator == null) { const iteratorResult = await this.moreIterators.next(); if (iteratorResult.done) { // No more streams to stream from. return { value: null, done: true }; } this.iterator = iteratorResult.value; if (this.baseErrorHandler != null) { this.iterator = this.iterator.handleErrors(this.baseErrorHandler); } } const itemResult = await this.iterator.next(); if (itemResult.done) { this.iterator = null; return this.readFromChain(lastRead); } return itemResult; } } export var ZipMismatchMode; (function (ZipMismatchMode) { ZipMismatchMode[ZipMismatchMode["FAIL"] = 0] = "FAIL"; ZipMismatchMode[ZipMismatchMode["SHORTEST"] = 1] = "SHORTEST"; ZipMismatchMode[ZipMismatchMode["LONGEST"] = 2] = "LONGEST"; // use nulls for exhausted streams; use up the longest stream. })(ZipMismatchMode || (ZipMismatchMode = {})); /** * Provides a `LazyIterator` that zips together an array, dict, or nested * structure of `LazyIterator`s (and perhaps additional constants). * * The underlying streams must provide elements in a consistent order such * that they correspond. * * Typically, the underlying streams should have the same number of * elements. If they do not, the behavior is determined by the * `mismatchMode` argument. * * The nested structure of the `iterators` argument determines the * structure of elements in the resulting iterator. * * Doing this in a concurrency-safe way requires some trickery. In * particular, we want this stream to return the elements from the * underlying streams in the correct order according to when next() was * called, even if the resulting Promises resolve in a different order. * * @param iterators: An array or object containing LazyIterators at the * leaves. * @param mismatchMode: Determines what to do when one underlying iterator * is exhausted before the others. `ZipMismatchMode.FAIL` (the default) * causes an error to be thrown in this case. `ZipMismatchMode.SHORTEST` * causes the zipped iterator to terminate with the furst underlying * streams, so elements remaining on the longer streams are ignored. * `ZipMismatchMode.LONGEST` causes the zipped stream to continue, filling * in nulls for the exhausted streams, until all streams are exhausted. */ class ZipIterator extends LazyIterator { constructor(iterators, mismatchMode = ZipMismatchMode.FAIL) { super(); this.iterators = iterators; this.mismatchMode = mismatchMode; this.count = 0; this.currentPromise = null; } summary() { const upstreamSummaries = 'TODO: fill in upstream of zip summaries'; return `{${upstreamSummaries}} -> Zip`; } async nextState(afterState) { // This chaining ensures that the underlying next() are not even called // before the previous ones have resolved. await afterState; // Collect underlying iterator "done" signals as a side effect in // getNext() let numIterators = 0; let iteratorsDone = 0; function getNext(container) { if (container instanceof LazyIterator) { const result = container.next(); return { value: result.then(x => { numIterators++; if (x.done) { iteratorsDone++; } return x.value; }), recurse: false }; } else { return { value: null, recurse: true }; } } const mapped = await deepMapAndAwaitAll(this.iterators, getNext); if (numIterators === iteratorsDone) { // The streams have all ended. return { value: null, done: true }; } if (iteratorsDone > 0) { switch (this.mismatchMode) { case ZipMismatchMode.FAIL: throw new Error('Zipped streams should have the same length. ' + `Mismatched at element ${this.count}.`); case ZipMismatchMode.SHORTEST: return { value: null, done: true }; case ZipMismatchMode.LONGEST: default: // Continue. The exhausted streams already produced value: null. } } this.count++; return { value: mapped, done: false }; } async next() { this.currentPromise = this.nextState(this.currentPromise); return this.currentPromise; } } // Iterators that maintain a ring buffer of pending promises // ============================================================================ /** * A stream that prefetches a given number of items from an upstream source, * returning them in FIFO order. * * Note this prefetches Promises, but makes no guarantees about when those * Promises resolve. */ export class PrefetchIterator extends LazyIterator { constructor(upstream, bufferSize) { super(); this.upstream = upstream; this.bufferSize = bufferSize; this.buffer = new RingBuffer(bufferSize); } summary() { return `${this.upstream.summary()} -> Prefetch`; } /** * Refill the prefetch buffer. Returns only after the buffer is full, or * the upstream source is exhausted. */ refill() { while (!this.buffer.isFull()) { const v = this.upstream.next(); this.buffer.push(v); } } next() { this.refill(); // This shift will never throw an error because the buffer is always // full after a refill. If the stream is exhausted, the buffer will be // full of Promises that will resolve to the end-of-stream signal. return this.buffer.shift(); } } /** * A stream that performs a sliding-window random shuffle on an upstream * source. This is like a `PrefetchIterator` except that the items are * returned in randomized order. Mixing naturally improves as the buffer * size increases. */ export class ShuffleIterator extends PrefetchIterator { constructor(upstream, windowSize, seed) { super(upstream, windowSize); this.upstream = upstream; this.windowSize = windowSize; // Local state that should not be clobbered by out-of-order execution. this.upstreamExhausted = false; this.random = seedrandom.alea(seed || tf.util.now().toString()); this.lastRead = Promise.resolve({ value: null, done: false }); } async next() { // This sets this.lastRead to a new Promise right away, as opposed to // saying `await this.lastRead; this.lastRead = this.serialNext();` which // would not work because this.nextRead would be updated only after the // promise resolves. this.lastRead = this.lastRead.then(() => this.serialNext()); return this.lastRead; } randomInt(max) { return Math.floor(this.random() * max); } chooseIndex() { return this.randomInt(this.buffer.length()); } async serialNext() { // TODO(soergel): consider performance if (!this.upstreamExhausted) { this.refill(); } while (!this.buffer.isEmpty()) { const chosenIndex = this.chooseIndex(); const result = await this.buffer.shuffleExcise(chosenIndex); if (result.done) { this.upstreamExhausted = true; } else { this.refill(); return result; } } return { value: null, done: true }; } } //# sourceMappingURL=data:application/json;base64,{"version":3,"file":"lazy_iterator.js","sourceRoot":"","sources":["../../../../../../tfjs-data/src/iterators/lazy_iterator.ts"],"names":[],"mappings":"AAAA;;;;;;;;;;;;;;;;GAgBG;AAEH,OAAO,KAAK,EAAE,MAAM,uBAAuB,CAAC;AAC5C,OAAO,KAAK,UAAU,MAAM,YAAY,CAAC;AAGzC,OAAO,EAAC,SAAS,EAAC,MAAM,oBAAoB,CAAC;AAC7C,OAAO,EAAC,kBAAkB,EAAqC,OAAO,EAAE,SAAS,EAAC,MAAM,kBAAkB,CAAC;AAC3G,OAAO,EAAC,iBAAiB,EAAC,MAAM,6BAA6B,CAAC;AAC9D,OAAO,EAAC,UAAU,EAAC,MAAM,qBAAqB,CAAC;AAO/C,oDAAoD;AACpD,kEAAkE;AAClE,0DAA0D;AAE1D;;GAEG;AACH,MAAM,UAAU,iBAAiB,CAAI,KAAU;IAC7C,OAAO,IAAI,aAAa,CAAC,KAAK,CAAC,CAAC;AAClC,CAAC;AAED;;GAEG;AACH,MAAM,UAAU,wBAAwB,CAAC,KAAa;IACpD,IAAI,CAAC,GAAG,KAAK,CAAC;IACd,OAAO,oBAAoB,CAAC,GAAG,EAAE,CAAC,CAAC,EAAC,KAAK,EAAE,CAAC,EAAE,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC,CAAC,CAAC;AACjE,CAAC;AAED;;;;;;;;;;;;GAYG;AACH,MAAM,UAAU,oBAAoB,CAChC,IACiD;IACnD,OAAO,IAAI,oBAAoB,CAAC,IAAI,CAAC,CAAC;AACxC,CAAC;AAED;;;;;;;;;;;GAWG;AACH,MAAM,UAAU,wBAAwB,CACpC,aAA4C,EAC5C,gBAAwC;IAC1C,OAAO,IAAI,eAAe,CAAC,aAAa,EAAE,gBAAgB,CAAC,CAAC;AAC9D,CAAC;AAED;;;;;;;;;;;;;;;GAeG;AACH,MAAM,UAAU,gCAAgC,CAC5C,YAAmD,EAAE,KAAa,EAClE,gBAAwC;IAC1C,OAAO,wBAAwB,CAC3B,oBAAoB,CAAC,YAAY,CAAC,CAAC,IAAI,CAAC,KAAK,CAAC,EAAE,gBAAgB,CAAC,CAAC;AACxE,CAAC;AAED;;;;;;;;;;;;;;;;;;;;;;;GAuBG;AACH,MAAM,UAAU,kBAAkB,CAC9B,SAA4B,EAC5B,eAAgC,eAAe,CAAC,IAAI;IACtD,OAAO,IAAI,WAAW,CAAI,SAAS,EAAE,YAAY,CAAC,CAAC;AACrD,CAAC;AAED;;;;;;GAMG;AACH,MAAM,OAAgB,YAAY;IAgBhC;;;;;;;OAOG;IACH,KAAK,CAAC,OAAO;QACX,MAAM,MAAM,GAAQ,EAAE,CAAC;QACvB,IAAI,CAAC,GAAG,MAAM,IAAI,CAAC,IAAI,EAAE,CAAC;QAC1B,OAAO,CAAC,CAAC,CAAC,IAAI,EAAE;YACd,MAAM,CAAC,IAAI,CAAC,CAAC,CAAC,KAAK,CAAC,CAAC;YACrB,CAAC,GAAG,MAAM,IAAI,CAAC,IAAI,EAAE,CAAC;SACvB;QACD,OAAO,MAAM,CAAC;IAChB,CAAC;IAED;;;;;;;;;;OAUG;IACH,KAAK,CAAC,cAAc;QAClB,MAAM,MAAM,GAAG,IAAI,CAAC,QAAQ,CAAC,GAAG,CAAC,CAAC;QAClC,MAAM,MAAM,GAAQ,EAAE,CAAC;QACvB,IAAI,CAAC,GAAG,MAAM,MAAM,CAAC,IAAI,EAAE,CAAC;QAC5B,OAAO,CAAC,CAAC,CAAC,IAAI,EAAE;YACd,MAAM,CAAC,IAAI,CAAC,CAAC,CAAC,KAAK,CAAC,CAAC;YACrB,CAAC,GAAG,MAAM,MAAM,CAAC,IAAI,EAAE,CAAC;SACzB;QACD,OAAO,MAAM,CAAC;IAChB,CAAC;IAED;;;;;;OAMG;IACH,KAAK,CAAC,YAAY;QAChB,IAAI,CAAC,GAAG,MAAM,IAAI,CAAC,IAAI,EAAE,CAAC;QAC1B,OAAO,CAAC,CAAC,CAAC,IAAI,EAAE;YACd,CAAC,GAAG,MAAM,IAAI,CAAC,IAAI,EAAE,CAAC;SACvB;IACH,CAAC;IAED;;;;;;OAMG;IACH,KAAK,CAAC,YAAY,CAAC,SAA4B;QAC7C,IAAI,CAAC,GAAG,MAAM,IAAI,CAAC,IAAI,EAAE,CAAC;QAC1B,IAAI,cAAc,GAAG,SAAS,CAAC,CAAC,CAAC,KAAK,CAAC,CAAC;QACxC,OAAO,CAAC,CAAC,CAAC,CAAC,IAAI,CAAC,IAAI,cAAc,EAAE;YAClC,CAAC,GAAG,MAAM,IAAI,CAAC,IAAI,EAAE,CAAC;YACtB,cAAc,GAAG,SAAS,CAAC,CAAC,CAAC,KAAK,CAAC,CAAC;SACrC;IACH,CAAC;IAED;;;;;;;;;;;OAWG;IACH,YAAY,CAAC,OAAkC;QAC7C,OAAO,IAAI,yBAAyB,CAAC,IAAI,EAAE,OAAO,CAAC,CAAC;IACtD,CAAC;IAED,yCAAyC;IAEzC;;;;;;;OAOG;IACH,MAAM,CAAC,SAAgC;QACrC,OAAO,IAAI,cAAc,CAAC,IAAI,EAAE,SAAS,CAAC,CAAC;IAC7C,CAAC;IAED;;;;;;;OAOG;IACH,GAAG,CAAI,SAA0B;QAC/B,OAAO,IAAI,WAAW,CAAC,IAAI,EAAE,SAAS,CAAC,CAAC;IAC1C,CAAC;IAED;;;;;;;OAOG;IACH,QAAQ,CAAI,SAAmC;QAC7C,OAAO,IAAI,gBAAgB,CAAC,IAAI,EAAE,SAAS,CAAC,CAAC;IAC/C,CAAC;IAED;;;;;;;OAOG;IACH,cAAc,CAAI,SAAmC;QACnD,OAAO,IAAI,gBAAgB,CAAC,IAAI,EAAE,SAAS,CAAC,CAAC,MAAM,EAAE,CAAC;IACxD,CAAC;IAED;;;;;;;OAOG;IACH,OAAO,CAAI,SAA4B;QACrC,OAAO,IAAI,eAAe,CAAC,IAAI,EAAE,SAAS,CAAC,CAAC;IAC9C,CAAC;IAED;;;;OAIG;IACH,KAAK,CAAC,YAAY,CAAC,CAAqB;QACtC,OAAO,IAAI,CAAC,GAAG,CAAC,CAAC,CAAC,CAAC,YAAY,EAAE,CAAC;IACpC,CAAC;IAED;;;;;;OAMG;IACH,KAAK,CAAC,aAAa,CAAC,CAAiC;QACnD,OAAO,IAAI,CAAC,cAAc,CAAC,CAAC,CAAC,CAAC,YAAY,CAAC,CAAC,CAAC,EAAE,CAAC,CAAC,CAAC,KAAK,IAAI,CAAC,CAAC,CAAC;IAChE,CAAC;IAED;;;;;;;;;;;;;;;;;OAiBG;IACH,aAAa,CAAC,SAAiB,EAAE,cAAc,GAAG,IAAI;QACpD,OAAO,IAAI,qBAAqB,CAAC,IAAI,EAAE,SAAS,EAAE,cAAc,CAAC,CAAC;IACpE,CAAC;IAED;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;OA+BG;IACH,gBAAgB,CACZ,SAAiB,EAAE,cAAc,GAAG,IAAI;IACxC,kCAAkC;IAClC,QAAsC,SAAS;QAEjD,2EAA2E;QAC3E,MAAM,UAAU,GAAG,IAAI,CAAC,aAAa,CAAC,SAAS,EAAE,cAAc,CAAC,CAAC;QACjE,2EAA2E;QAC3E,uEAAuE;QACvE,OAAO,UAAU,CAAC,GAAG,CAAC,CAAC,CAAC,EAAE,CAAC,OAAO,CAAC,CAAC,EAAE,KAAK,CAAC,CAAC,CAAC;IAChD,CAAC;IAED;;;;;;;;;OASG;IACH,WAAW,CACP,QAAyB,EACzB,gBAAwC;QAC1C,OAAO,IAAI,eAAe,CACtB,iBAAiB,CAAC,CAAC,IAAI,EAAE,QAAQ,CAAC,CAAC,EAAE,gBAAgB,CAAC,CAAC;IAC7D,CAAC;IAED;;;;;;OAMG;IACH,IAAI,CAAC,KAAa;QAChB,IAAI,KAAK,GAAG,CAAC,IAAI,KAAK,IAAI,IAAI,EAAE;YAC9B,OAAO,IAAI,CAAC;SACb;QACD,OAAO,IAAI,YAAY,CAAC,IAAI,EAAE,KAAK,CAAC,CAAC;IACvC,CAAC;IAED;;;;;OAKG;IACH,IAAI,CAAC,KAAa;QAChB,IAAI,KAAK,GAAG,CAAC,IAAI,KAAK,IAAI,IAAI,EAAE;YAC9B,OAAO,IAAI,CAAC;SACb;QACD,OAAO,IAAI,YAAY,CAAC,IAAI,EAAE,KAAK,CAAC,CAAC;IACvC,CAAC;IAED;;;;;;;;OAQG;IACH,QAAQ,CAAC,UAAkB;QACzB,OAAO,IAAI,gBAAgB,CAAC,IAAI,EAAE,UAAU,CAAC,CAAC;IAChD,CAAC;IAED,uDAAuD;IAEvD;;;;;;;OAOG;IACH,OAAO,CAAC,UAAkB,EAAE,IAAa;QACvC,OAAO,IAAI,eAAe,CAAC,IAAI,EAAE,UAAU,EAAE,IAAI,CAAC,CAAC;IACrD,CAAC;IAED;;;OAGG;IACH,MAAM;QACJ,OAAO,IAAI,cAAc,CAAC,IAAI,CAAC,CAAC;IAClC,CAAC;CACF;AAED,+EAA+E;AAC/E,yEAAyE;AACzE,0EAA0E;AAC1E,kDAAkD;AAClD,+EAA+E;AAE/E,mDAAmD;AACnD,+EAA+E;AAE/E,MAAM,aAAiB,SAAQ,YAAe;IAE5C,YAAsB,KAAU;QAC9B,KAAK,EAAE,CAAC;QADY,UAAK,GAAL,KAAK,CAAK;QADxB,SAAI,GAAG,CAAC,CAAC;IAGjB,CAAC;IAED,OAAO;QACL,OAAO,YAAY,IAAI,CAAC,KAAK,CAAC,MAAM,QAAQ,CAAC;IAC/C,CAAC;IAED,KAAK,CAAC,IAAI;QACR,IAAI,IAAI,CAAC,IAAI,IAAI,IAAI,CAAC,KAAK,CAAC,MAAM,EAAE;YAClC,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;SAClC;QACD,MAAM,IAAI,GAAG,IAAI,CAAC,KAAK,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC;QACnC,IAAI,CAAC,IAAI,EAAE,CAAC;QACZ,OAAO,EAAC,KAAK,EAAE,SAAS,CAAC,IAAI,CAAC,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC;IAC/C,CAAC;CACF;AAED,MAAM,oBAAwB,SAAQ,YAAe;IACnD,YACc,MAA2D;QACvE,KAAK,EAAE,CAAC;QADI,WAAM,GAAN,MAAM,CAAqD;IAEzE,CAAC;IAED,OAAO;QACL,OAAO,eAAe,CAAC;IACzB,CAAC;IAED,KAAK,CAAC,IAAI;QACR,IAAI;YACF,OAAO,IAAI,CAAC,MAAM,EAAE,CAAC;SACtB;QAAC,OAAO,CAAC,EAAE;YACV,4DAA4D;YAC5D,CAAC,CAAC,OAAO;gBACL,mDAAmD,CAAC,CAAC,OAAO,EAAE,CAAC;YACnE,MAAM,CAAC,CAAC;SACT;IACH,CAAC;CACF;AAED,MAAM,cAAkB,SAAQ,YAAe;IAK7C,YAAsB,QAAyB;QAC7C,KAAK,EAAE,CAAC;QADY,aAAQ,GAAR,QAAQ,CAAiB;QAE7C,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,OAAO,CAAC,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC,CAAC;IAC9D,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,YAAY,CAAC;IAChD,CAAC;IAED,KAAK,CAAC,IAAI;QACR,qEAAqE;QACrE,yEAAyE;QACzE,uEAAuE;QACvE,oBAAoB;QACpB,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,GAAG,EAAE,CAAC,IAAI,CAAC,UAAU,EAAE,CAAC,CAAC;QAC5D,OAAO,IAAI,CAAC,QAAQ,CAAC;IACvB,CAAC;IAEO,KAAK,CAAC,UAAU;QACtB,OAAO,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;IAC9B,CAAC;CACF;AAED,MAAM,YAAgB,SAAQ,YAAe;IAQ3C,YAAsB,QAAyB,EAAY,QAAgB;QACzE,KAAK,EAAE,CAAC;QADY,aAAQ,GAAR,QAAQ,CAAiB;QAAY,aAAQ,GAAR,QAAQ,CAAQ;QAH3E,sEAAsE;QACtE,UAAK,GAAG,CAAC,CAAC;QAIR,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,OAAO,CAAC,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC,CAAC;IAC9D,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,UAAU,CAAC;IAC9C,CAAC;IAED,KAAK,CAAC,IAAI;QACR,qEAAqE;QACrE,yEAAyE;QACzE,uEAAuE;QACvE,oBAAoB;QACpB,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,GAAG,EAAE,CAAC,IAAI,CAAC,UAAU,EAAE,CAAC,CAAC;QAC5D,OAAO,IAAI,CAAC,QAAQ,CAAC;IACvB,CAAC;IAEO,KAAK,CAAC,UAAU;QACtB,gEAAgE;QAChE,8DAA8D;QAC9D,yEAAyE;QACzE,oBAAoB;QACpB,OAAO,IAAI,CAAC,KAAK,EAAE,GAAG,IAAI,CAAC,QAAQ,EAAE;YACnC,MAAM,OAAO,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;YAC3C,6CAA6C;YAC7C,IAAI,OAAO,CAAC,IAAI,EAAE;gBAChB,OAAO,OAAO,CAAC;aAChB;YACD,EAAE,CAAC,OAAO,CAAC,OAAO,CAAC,KAAW,CAAC,CAAC;SACjC;QACD,OAAO,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;IAC9B,CAAC;CACF;AAED,MAAM,YAAgB,SAAQ,YAAe;IAE3C,YAAsB,QAAyB,EAAY,QAAgB;QACzE,KAAK,EAAE,CAAC;QADY,aAAQ,GAAR,QAAQ,CAAiB;QAAY,aAAQ,GAAR,QAAQ,CAAQ;QAD3E,UAAK,GAAG,CAAC,CAAC;IAGV,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,UAAU,CAAC;IAC9C,CAAC;IAED,KAAK,CAAC,IAAI;QACR,IAAI,IAAI,CAAC,KAAK,EAAE,IAAI,IAAI,CAAC,QAAQ,EAAE;YACjC,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;SAClC;QACD,OAAO,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;IAC9B,CAAC;CACF;AAED,kEAAkE;AAClE,6EAA6E;AAC7E,SAAS;AACT,MAAM,qBAAyB,SAAQ,YAAiB;IAKtD,YACc,QAAyB,EAAY,SAAiB,EACtD,uBAAuB,IAAI;QACvC,KAAK,EAAE,CAAC;QAFI,aAAQ,GAAR,QAAQ,CAAiB;QAAY,cAAS,GAAT,SAAS,CAAQ;QACtD,yBAAoB,GAApB,oBAAoB,CAAO;QAEvC,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,OAAO,CAAC,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC,CAAC;IAC9D,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,mBAAmB,CAAC;IACvD,CAAC;IAED,KAAK,CAAC,IAAI;QACR,qEAAqE;QACrE,yEAAyE;QACzE,uEAAuE;QACvE,oBAAoB;QACpB,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,GAAG,EAAE,CAAC,IAAI,CAAC,UAAU,EAAE,CAAC,CAAC;QAC5D,OAAO,IAAI,CAAC,QAAQ,CAAC;IACvB,CAAC;IAEO,KAAK,CAAC,UAAU;QACtB,MAAM,KAAK,GAAQ,EAAE,CAAC;QACtB,OAAO,KAAK,CAAC,MAAM,GAAG,IAAI,CAAC,SAAS,EAAE;YACpC,MAAM,IAAI,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;YACxC,IAAI,IAAI,CAAC,IAAI,EAAE;gBACb,IAAI,IAAI,CAAC,oBAAoB,IAAI,KAAK,CAAC,MAAM,GAAG,CAAC,EAAE;oBACjD,OAAO,EAAC,KAAK,EAAE,KAAK,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC;iBACpC;gBACD,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;aAClC;YACD,KAAK,CAAC,IAAI,CAAC,IAAI,CAAC,KAAK,CAAC,CAAC;SACxB;QACD,OAAO,EAAC,KAAK,EAAE,KAAK,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC;IACrC,CAAC;CACF;AAED,MAAM,cAAkB,SAAQ,YAAe;IAK7C,YACc,QAAyB,EACzB,SAAgC;QAC5C,KAAK,EAAE,CAAC;QAFI,aAAQ,GAAR,QAAQ,CAAiB;QACzB,cAAS,GAAT,SAAS,CAAuB;QAE5C,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,OAAO,CAAC,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC,CAAC;IAC9D,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,YAAY,CAAC;IAChD,CAAC;IAED,KAAK,CAAC,IAAI;QACR,qEAAqE;QACrE,yEAAyE;QACzE,uEAAuE;QACvE,oBAAoB;QACpB,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,GAAG,EAAE,CAAC,IAAI,CAAC,UAAU,EAAE,CAAC,CAAC;QAC5D,OAAO,IAAI,CAAC,QAAQ,CAAC;IACvB,CAAC;IAEO,KAAK,CAAC,UAAU;QACtB,OAAO,IAAI,EAAE;YACX,MAAM,IAAI,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;YACxC,IAAI,IAAI,CAAC,IAAI,IAAI,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,KAAK,CAAC,EAAE;gBAC3C,OAAO,IAAI,CAAC;aACb;YACD,EAAE,CAAC,OAAO,CAAC,IAAI,CAAC,KAAW,CAAC,CAAC;SAC9B;IACH,CAAC;CACF;AAED,MAAM,WAAkB,SAAQ,YAAe;IAC7C,YACc,QAAyB,EACzB,SAA0B;QACtC,KAAK,EAAE,CAAC;QAFI,aAAQ,GAAR,QAAQ,CAAiB;QACzB,cAAS,GAAT,SAAS,CAAiB;IAExC,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,SAAS,CAAC;IAC7C,CAAC;IAED,KAAK,CAAC,IAAI;QACR,MAAM,IAAI,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;QACxC,IAAI,IAAI,CAAC,IAAI,EAAE;YACb,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;SAClC;QACD,MAAM,YAAY,GAAG,EAAE,CAAC,WAAW,CAAC,qBAAqB,CAAC,IAAI,CAAC,KAAW,CAAC,CAAC;QAC5E,uDAAuD;QACvD,mEAAmE;QACnE,uEAAuE;QACvE,kEAAkE;QAClE,kEAAkE;QAClE,UAAU;QACV,MAAM,MAAM,GAAG,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,KAAK,CAAC,CAAC;QAC1C,MAAM,aAAa,GAAG,EAAE,CAAC,WAAW,CAAC,qBAAqB,CAAC,MAAY,CAAC,CAAC;QAEzE,oCAAoC;QACpC,mDAAmD;QACnD,KAAK,MAAM,CAAC,IAAI,YAAY,EAAE;YAC5B,IAAI,CAAC,EAAE,CAAC,WAAW,CAAC,cAAc,CAAC,CAAC,EAAE,aAAa,CAAC,EAAE;gBACpD,CAAC,CAAC,OAAO,EAAE,CAAC;aACb;SACF;QACD,OAAO,EAAC,KAAK,EAAE,MAAM,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC;IACtC,CAAC;CACF;AAED,MAAM,yBAA6B,SAAQ,YAAe;IAExD,YACc,QAAyB,EACzB,OAAkC;QAC9C,KAAK,EAAE,CAAC;QAFI,aAAQ,GAAR,QAAQ,CAAiB;QACzB,YAAO,GAAP,OAAO,CAA2B;QAHhD,UAAK,GAAG,CAAC,CAAC;QAKR,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,OAAO,CAAC,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC,CAAC;IAC9D,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,kBAAkB,CAAC;IACtD,CAAC;IAMD,KAAK,CAAC,IAAI;QACR,qEAAqE;QACrE,yEAAyE;QACzE,uEAAuE;QACvE,oBAAoB;QACpB,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,GAAG,EAAE,CAAC,IAAI,CAAC,UAAU,EAAE,CAAC,CAAC;QAC5D,OAAO,IAAI,CAAC,QAAQ,CAAC;IACvB,CAAC;IAED,KAAK,CAAC,UAAU;QACd,OAAO,IAAI,EAAE;YACX,IAAI;gBACF,OAAO,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;aACnC;YAAC,OAAO,CAAC,EAAE;gBACV,IAAI,CAAC,IAAI,CAAC,OAAO,CAAC,CAAC,CAAC,EAAE;oBACpB,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;iBAClC;gBACD,sEAAsE;gBAEtE,sEAAsE;gBACtE,uEAAuE;gBACvE,wEAAwE;aACzE;SACF;IACH,CAAC;CACF;AAED,MAAM,gBAAuB,SAAQ,YAAe;IAClD,YACc,QAAyB,EACzB,SAAmC;QAC/C,KAAK,EAAE,CAAC;QAFI,aAAQ,GAAR,QAAQ,CAAiB;QACzB,cAAS,GAAT,SAAS,CAA0B;IAEjD,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,cAAc,CAAC;IAClD,CAAC;IAED,KAAK,CAAC,IAAI;QACR,MAAM,IAAI,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;QACxC,IAAI,IAAI,CAAC,IAAI,EAAE;YACb,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;SAClC;QACD,MAAM,YAAY,GAAG,EAAE,CAAC,WAAW,CAAC,qBAAqB,CAAC,IAAI,CAAC,KAAW,CAAC,CAAC;QAC5E,uDAAuD;QACvD,mEAAmE;QACnE,uEAAuE;QACvE,kEAAkE;QAClE,kEAAkE;QAClE,UAAU;QACV,MAAM,MAAM,GAAG,MAAM,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,KAAK,CAAC,CAAC;QAChD,MAAM,aAAa,GAAG,EAAE,CAAC,WAAW,CAAC,qBAAqB,CAAC,MAAY,CAAC,CAAC;QAEzE,oCAAoC;QACpC,mDAAmD;QACnD,KAAK,MAAM,CAAC,IAAI,YAAY,EAAE;YAC5B,IAAI,CAAC,EAAE,CAAC,WAAW,CAAC,cAAc,CAAC,CAAC,EAAE,aAAa,CAAC,EAAE;gBACpD,CAAC,CAAC,OAAO,EAAE,CAAC;aACb;SACF;QACD,OAAO,EAAC,KAAK,EAAE,MAAM,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC;IACtC,CAAC;CACF;AAED,mDAAmD;AACnD,+EAA+E;AAE/E;;;;;;;GAOG;AACH,MAAM,OAAgB,iBAAqB,SAAQ,YAAe;IAQhE;QACE,KAAK,EAAE,CAAC;QACR,IAAI,CAAC,WAAW,GAAG,IAAI,iBAAiB,EAAK,CAAC;QAC9C,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,OAAO,CAAC,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC,CAAC;IAC9D,CAAC;IAED,KAAK,CAAC,IAAI;QACR,qEAAqE;QACrE,yEAAyE;QACzE,uEAAuE;QACvE,oBAAoB;QACpB,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,GAAG,EAAE,CAAC,IAAI,CAAC,UAAU,EAAE,CAAC,CAAC;QAC5D,OAAO,IAAI,CAAC,QAAQ,CAAC;IACvB,CAAC;IAgBD,KAAK,CAAC,UAAU;QACd,kEAAkE;QAClE,sEAAsE;QACtE,wDAAwD;QACxD,OAAO,IAAI,CAAC,WAAW,CAAC,MAAM,EAAE,KAAK,CAAC,EAAE;YACtC,0CAA0C;YAC1C,IAAI,CAAC,MAAM,IAAI,CAAC,IAAI,EAAE,EAAE;gBACtB,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;aAClC;SACF;QACD,OAAO,EAAC,KAAK,EAAE,IAAI,CAAC,WAAW,CAAC,KAAK,EAAE,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC;IACxD,CAAC;CACF;AACD,MAAM,eAAsB,SAAQ,iBAAoB;IACtD,YACc,QAAyB,EACzB,SAA4B;QACxC,KAAK,EAAE,CAAC;QAFI,aAAQ,GAAR,QAAQ,CAAiB;QACzB,cAAS,GAAT,SAAS,CAAmB;IAE1C,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,aAAa,CAAC;IACjD,CAAC;IAED,KAAK,CAAC,IAAI;QACR,MAAM,IAAI,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;QACxC,IAAI,IAAI,CAAC,IAAI,EAAE;YACb,OAAO,KAAK,CAAC;SACd;QACD,MAAM,YAAY,GAAG,EAAE,CAAC,WAAW,CAAC,qBAAqB,CAAC,IAAI,CAAC,KAAW,CAAC,CAAC;QAC5E,uDAAuD;QACvD,mEAAmE;QACnE,uEAAuE;QACvE,sEAAsE;QACtE,sEAAsE;QACtE,MAAM,WAAW,GAAG,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,KAAK,CAAC,CAAC;QAC/C,MAAM,aAAa,GACf,EAAE,CAAC,WAAW,CAAC,qBAAqB,CAAC,WAAiB,CAAC,CAAC;QAC5D,IAAI,CAAC,WAAW,CAAC,OAAO,CAAC,WAAW,CAAC,CAAC;QAEtC,mEAAmE;QACnE,mDAAmD;QACnD,KAAK,MAAM,CAAC,IAAI,YAAY,EAAE;YAC5B,IAAI,CAAC,EAAE,CAAC,WAAW,CAAC,cAAc,CAAC,CAAC,EAAE,aAAa,CAAC,EAAE;gBACpD,CAAC,CAAC,OAAO,EAAE,CAAC;aACb;SACF;QAED,OAAO,IAAI,CAAC;IACd,CAAC;CACF;AAED;;;;;;;;GAQG;AACH,MAAM,OAAO,eAAmB,SAAQ,YAAe;IASrD,YACI,SAAwC,EACvB,gBAAwC;QAC3D,KAAK,EAAE,CAAC;QADW,qBAAgB,GAAhB,gBAAgB,CAAwB;QAV7D,kCAAkC;QAClC,qEAAqE;QAC7D,aAAQ,GAA+B,IAAI,CAAC;QAEpD,sEAAsE;QAC9D,aAAQ,GAAoB,IAAI,CAAC;QAOvC,IAAI,CAAC,aAAa,GAAG,SAAS,CAAC;IACjC,CAAC;IAED,OAAO;QACL,MAAM,iBAAiB,GAAG,6CAA6C,CAAC;QACxE,OAAO,GAAG,iBAAiB,aAAa,CAAC;IAC3C,CAAC;IAED,KAAK,CAAC,IAAI;QACR,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC,aAAa,CAAC,IAAI,CAAC,QAAQ,CAAC,CAAC;QAClD,OAAO,IAAI,CAAC,QAAQ,CAAC;IACvB,CAAC;IAEO,KAAK,CAAC,aAAa,CAAC,QAAoC;QAE9D,4EAA4E;QAC5E,qDAAqD;QACrD,oEAAoE;QACpE,6CAA6C;QAC7C,4DAA4D;QAC5D,MAAM,QAAQ,CAAC;QACf,IAAI,IAAI,CAAC,QAAQ,IAAI,IAAI,EAAE;YACzB,MAAM,cAAc,GAAG,MAAM,IAAI,CAAC,aAAa,CAAC,IAAI,EAAE,CAAC;YACvD,IAAI,cAAc,CAAC,IAAI,EAAE;gBACvB,kCAAkC;gBAClC,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;aAClC;YACD,IAAI,CAAC,QAAQ,GAAG,cAAc,CAAC,KAAK,CAAC;YACrC,IAAI,IAAI,CAAC,gBAAgB,IAAI,IAAI,EAAE;gBACjC,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,YAAY,CAAC,IAAI,CAAC,gBAAgB,CAAC,CAAC;aACnE;SACF;QACD,MAAM,UAAU,GAAG,MAAM,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;QAC9C,IAAI,UAAU,CAAC,IAAI,EAAE;YACnB,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC;YACrB,OAAO,IAAI,CAAC,aAAa,CAAC,QAAQ,CAAC,CAAC;SACrC;QACD,OAAO,UAAU,CAAC;IACpB,CAAC;CACF;AAED,MAAM,CAAN,IAAY,eAIX;AAJD,WAAY,eAAe;IACzB,qDAAI,CAAA;IACJ,6DAAQ,CAAA;IACR,2DAAO,CAAA,CAAI,8DAA8D;AAC3E,CAAC,EAJW,eAAe,KAAf,eAAe,QAI1B;AAED;;;;;;;;;;;;;;;;;;;;;;;;;;;;GA4BG;AACH,MAAM,WAA0C,SAAQ,YAAe;IAIrE,YACuB,SAA4B,EAC5B,eAAgC,eAAe,CAAC,IAAI;QACzE,KAAK,EAAE,CAAC;QAFa,cAAS,GAAT,SAAS,CAAmB;QAC5B,iBAAY,GAAZ,YAAY,CAAwC;QALnE,UAAK,GAAG,CAAC,CAAC;QACV,mBAAc,GAA+B,IAAI,CAAC;IAM1D,CAAC;IAED,OAAO;QACL,MAAM,iBAAiB,GAAG,yCAAyC,CAAC;QACpE,OAAO,IAAI,iBAAiB,UAAU,CAAC;IACzC,CAAC;IAEO,KAAK,CAAC,SAAS,CAAC,UAAsC;QAE5D,uEAAuE;QACvE,0CAA0C;QAC1C,MAAM,UAAU,CAAC;QAEjB,iEAAiE;QACjE,YAAY;QACZ,IAAI,YAAY,GAAG,CAAC,CAAC;QACrB,IAAI,aAAa,GAAG,CAAC,CAAC;QAEtB,SAAS,OAAO,CAAC,SAA4B;YAC3C,IAAI,SAAS,YAAY,YAAY,EAAE;gBACrC,MAAM,MAAM,GAAG,SAAS,CAAC,IAAI,EAAE,CAAC;gBAChC,OAAO;oBACL,KAAK,EAAE,MAAM,CAAC,IAAI,CAAC,CAAC,CAAC,EAAE;wBACrB,YAAY,EAAE,CAAC;wBACf,IAAI,CAAC,CAAC,IAAI,EAAE;4BACV,aAAa,EAAE,CAAC;yBACjB;wBACD,OAAO,CAAC,CAAC,KAAK,CAAC;oBACjB,CAAC,CAAC;oBACF,OAAO,EAAE,KAAK;iBACf,CAAC;aACH;iBAAM;gBACL,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,OAAO,EAAE,IAAI,EAAC,CAAC;aACrC;QACH,CAAC;QAED,MAAM,MAAM,GAAM,MAAM,kBAAkB,CAAC,IAAI,CAAC,SAAS,EAAE,OAAO,CAAC,CAAC;QAEpE,IAAI,YAAY,KAAK,aAAa,EAAE;YAClC,8BAA8B;YAC9B,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;SAClC;QACD,IAAI,aAAa,GAAG,CAAC,EAAE;YACrB,QAAQ,IAAI,CAAC,YAAY,EAAE;gBACzB,KAAK,eAAe,CAAC,IAAI;oBACvB,MAAM,IAAI,KAAK,CACX,8CAA8C;wBAC9C,yBAAyB,IAAI,CAAC,KAAK,GAAG,CAAC,CAAC;gBAC9C,KAAK,eAAe,CAAC,QAAQ;oBAC3B,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;gBACnC,KAAK,eAAe,CAAC,OAAO,CAAC;gBAC7B,QAAQ;gBACN,iEAAiE;aACpE;SACF;QAED,IAAI,CAAC,KAAK,EAAE,CAAC;QACb,OAAO,EAAC,KAAK,EAAE,MAAM,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC;IACtC,CAAC;IAED,KAAK,CAAC,IAAI;QACR,IAAI,CAAC,cAAc,GAAG,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,cAAc,CAAC,CAAC;QAC1D,OAAO,IAAI,CAAC,cAAc,CAAC;IAC7B,CAAC;CACF;AAED,4DAA4D;AAC5D,+EAA+E;AAE/E;;;;;;GAMG;AACH,MAAM,OAAO,gBAAoB,SAAQ,YAAe;IAGtD,YACc,QAAyB,EAAY,UAAkB;QACnE,KAAK,EAAE,CAAC;QADI,aAAQ,GAAR,QAAQ,CAAiB;QAAY,eAAU,GAAV,UAAU,CAAQ;QAEnE,IAAI,CAAC,MAAM,GAAG,IAAI,UAAU,CAA6B,UAAU,CAAC,CAAC;IACvE,CAAC;IAED,OAAO;QACL,OAAO,GAAG,IAAI,CAAC,QAAQ,CAAC,OAAO,EAAE,cAAc,CAAC;IAClD,CAAC;IAED;;;OAGG;IACO,MAAM;QACd,OAAO,CAAC,IAAI,CAAC,MAAM,CAAC,MAAM,EAAE,EAAE;YAC5B,MAAM,CAAC,GAAG,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC;YAC/B,IAAI,CAAC,MAAM,CAAC,IAAI,CAAC,CAAC,CAAC,CAAC;SACrB;IACH,CAAC;IAED,IAAI;QACF,IAAI,CAAC,MAAM,EAAE,CAAC;QACd,oEAAoE;QACpE,sEAAsE;QACtE,kEAAkE;QAClE,OAAO,IAAI,CAAC,MAAM,CAAC,KAAK,EAAE,CAAC;IAC7B,CAAC;CACF;AAED;;;;;GAKG;AACH,MAAM,OAAO,eAAmB,SAAQ,gBAAmB;IAUzD,YACqB,QAAyB,EAAY,UAAkB,EACxE,IAAa;QACf,KAAK,CAAC,QAAQ,EAAE,UAAU,CAAC,CAAC;QAFT,aAAQ,GAAR,QAAQ,CAAiB;QAAY,eAAU,GAAV,UAAU,CAAQ;QAJ5E,sEAAsE;QAC9D,sBAAiB,GAAG,KAAK,CAAC;QAMhC,IAAI,CAAC,MAAM,GAAG,UAAU,CAAC,IAAI,CAAC,IAAI,IAAI,EAAE,CAAC,IAAI,CAAC,GAAG,EAAE,CAAC,QAAQ,EAAE,CAAC,CAAC;QAChE,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,OAAO,CAAC,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAC,CAAC,CAAC;IAC9D,CAAC;IAEQ,KAAK,CAAC,IAAI;QACjB,qEAAqE;QACrE,yEAAyE;QACzE,uEAAuE;QACvE,oBAAoB;QACpB,IAAI,CAAC,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,GAAG,EAAE,CAAC,IAAI,CAAC,UAAU,EAAE,CAAC,CAAC;QAC5D,OAAO,IAAI,CAAC,QAAQ,CAAC;IACvB,CAAC;IAEO,SAAS,CAAC,GAAW;QAC3B,OAAO,IAAI,CAAC,KAAK,CAAC,IAAI,CAAC,MAAM,EAAE,GAAG,GAAG,CAAC,CAAC;IACzC,CAAC;IAES,WAAW;QACnB,OAAO,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,MAAM,CAAC,MAAM,EAAE,CAAC,CAAC;IAC9C,CAAC;IAED,KAAK,CAAC,UAAU;QACd,sCAAsC;QACtC,IAAI,CAAC,IAAI,CAAC,iBAAiB,EAAE;YAC3B,IAAI,CAAC,MAAM,EAAE,CAAC;SACf;QACD,OAAO,CAAC,IAAI,CAAC,MAAM,CAAC,OAAO,EAAE,EAAE;YAC7B,MAAM,WAAW,GAAG,IAAI,CAAC,WAAW,EAAE,CAAC;YACvC,MAAM,MAAM,GAAG,MAAM,IAAI,CAAC,MAAM,CAAC,aAAa,CAAC,WAAW,CAAC,CAAC;YAC5D,IAAI,MAAM,CAAC,IAAI,EAAE;gBACf,IAAI,CAAC,iBAAiB,GAAG,IAAI,CAAC;aAC/B;iBAAM;gBACL,IAAI,CAAC,MAAM,EAAE,CAAC;gBACd,OAAO,MAAM,CAAC;aACf;SACF;QACD,OAAO,EAAC,KAAK,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAC,CAAC;IACnC,CAAC;CACF","sourcesContent":["/**\n * @license\n * Copyright 2018 Google LLC. All Rights Reserved.\n * Licensed under the Apache License, Version 2.0 (the \"License\");\n * you may not use this file except in compliance with the License.\n * You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n *\n * =============================================================================\n */\n\nimport * as tf from '@tensorflow/tfjs-core';\nimport * as seedrandom from 'seedrandom';\n\nimport {Container} from '../types';\nimport {deepClone} from '../util/deep_clone';\nimport {deepMapAndAwaitAll, DeepMapAsyncResult, DeepMapResult, deepZip, zipToList} from '../util/deep_map';\nimport {GrowingRingBuffer} from '../util/growing_ring_buffer';\nimport {RingBuffer} from '../util/ring_buffer';\n\n/**\n * A nested structure of LazyIterators, used as the input to zip().\n */\nexport type IteratorContainer = Container<LazyIterator<tf.TensorContainer>>;\n\n// Here we implement a simple asynchronous iterator.\n// This lets us avoid using either third-party stream libraries or\n// recent TypeScript language support requiring polyfills.\n\n/**\n * Create a `LazyIterator` from an array of items.\n */\nexport function iteratorFromItems<T>(items: T[]): LazyIterator<T> {\n  return new ArrayIterator(items);\n}\n\n/**\n * Create a `LazyIterator` of incrementing integers.\n */\nexport function iteratorFromIncrementing(start: number): LazyIterator<number> {\n  let i = start;\n  return iteratorFromFunction(() => ({value: i++, done: false}));\n}\n\n/**\n * Create a `LazyIterator` from a function.\n *\n * ```js\n * let i = -1;\n * const func = () =>\n *    ++i < 5 ? {value: i, done: false} : {value: null, done: true};\n * const iter = tf.data.iteratorFromFunction(func);\n * await iter.forEachAsync(e => console.log(e));\n * ```\n *\n * @param func A function that produces data on each call.\n */\nexport function iteratorFromFunction<T>(\n    func: () =>\n        IteratorResult<T>| Promise<IteratorResult<T>>): LazyIterator<T> {\n  return new FunctionCallIterator(func);\n}\n\n/**\n * Create a `LazyIterator` by concatenating underlying streams, which are\n * themselves provided as a stream.\n *\n * This can also be thought of as a \"stream flatten\" operation.\n *\n * @param baseIterators A stream of streams to be concatenated.\n * @param baseErrorHandler An optional function that can intercept `Error`s\n *   raised during a `next()` call on the base stream.  This function can decide\n *   whether the error should be propagated, whether the error should be\n *   ignored, or whether the base stream should be terminated.\n */\nexport function iteratorFromConcatenated<T>(\n    baseIterators: LazyIterator<LazyIterator<T>>,\n    baseErrorHandler?: (e: Error) => boolean): LazyIterator<T> {\n  return new ChainedIterator(baseIterators, baseErrorHandler);\n}\n\n/**\n * Create a `LazyIterator` by concatenating streams produced by calling a\n * stream-generating function a given number of times.\n *\n * Since a `LazyIterator` is read-once, it cannot be repeated, but this\n * function can be used to achieve a similar effect:\n *\n *   LazyIterator.ofConcatenatedFunction(() => new MyIterator(), 6);\n *\n * @param iteratorFunc: A function that produces a new stream on each call.\n * @param count: The number of times to call the function.\n * @param baseErrorHandler An optional function that can intercept `Error`s\n *   raised during a `next()` call on the base stream.  This function can decide\n *   whether the error should be propagated, whether the error should be\n *   ignored, or whether the base stream should be terminated.\n */\nexport function iteratorFromConcatenatedFunction<T>(\n    iteratorFunc: () => IteratorResult<LazyIterator<T>>, count: number,\n    baseErrorHandler?: (e: Error) => boolean): LazyIterator<T> {\n  return iteratorFromConcatenated(\n      iteratorFromFunction(iteratorFunc).take(count), baseErrorHandler);\n}\n\n/**\n * Create a `LazyIterator` by zipping together an array, dict, or nested\n * structure of `LazyIterator`s (and perhaps additional constants).\n *\n * The underlying streams must provide elements in a consistent order such\n * that they correspond.\n *\n * Typically, the underlying streams should have the same number of\n * elements. If they do not, the behavior is determined by the\n * `mismatchMode` argument.\n *\n * The nested structure of the `iterators` argument determines the\n * structure of elements in the resulting iterator.\n *\n * @param iterators: An array or object containing LazyIterators at the\n * leaves.\n * @param mismatchMode: Determines what to do when one underlying iterator\n * is exhausted before the others.  `ZipMismatchMode.FAIL` (the default)\n * causes an error to be thrown in this case.  `ZipMismatchMode.SHORTEST`\n * causes the zipped iterator to terminate with the furst underlying\n * streams, so elements remaining on the longer streams are ignored.\n * `ZipMismatchMode.LONGEST` causes the zipped stream to continue, filling\n * in nulls for the exhausted streams, until all streams are exhausted.\n */\nexport function iteratorFromZipped<O extends tf.TensorContainer>(\n    iterators: IteratorContainer,\n    mismatchMode: ZipMismatchMode = ZipMismatchMode.FAIL): LazyIterator<O> {\n  return new ZipIterator<O>(iterators, mismatchMode);\n}\n\n/**\n * An asynchronous iterator, providing lazy access to a potentially\n * unbounded stream of elements.\n *\n * Iterator can be obtained from a dataset:\n * `const iter = await dataset.iterator();`\n */\nexport abstract class LazyIterator<T> {\n  // This class implements AsyncIterator<T>, but we have not yet set the\n  // TypeScript --downlevelIteration flag to enable that.\n\n  abstract summary(): string;\n\n  /**\n   * Returns a `Promise` for the next element in the stream.\n   *\n   * When an item can be provided successfully, the return value is\n   * `{value:T, done:false}`.\n   *\n   * Calling next() on a closed stream returns `{value:null, done:true}`.\n   */\n  abstract next(): Promise<IteratorResult<T>>;\n\n  /**\n   * Collect all remaining elements of a bounded stream into an array.\n   * Obviously this will succeed only for small streams that fit in memory.\n   * Useful for testing.\n   *\n   * @returns A Promise for an array of stream elements, which will resolve\n   *   when the stream is exhausted.\n   */\n  async toArray(): Promise<T[]> {\n    const result: T[] = [];\n    let x = await this.next();\n    while (!x.done) {\n      result.push(x.value);\n      x = await this.next();\n    }\n    return result;\n  }\n\n  /**\n   * Collect all elements of this dataset into an array with prefetching 100\n   * elements. This is useful for testing, because the prefetch changes the\n   * order in which the Promises are resolved along the processing pipeline.\n   * This may help expose bugs where results are dependent on the order of\n   * Promise resolution rather than on the logical order of the stream (i.e.,\n   * due to hidden mutable state).\n   *\n   * @returns A Promise for an array of stream elements, which will resolve\n   *   when the stream is exhausted.\n   */\n  async toArrayForTest(): Promise<T[]> {\n    const stream = this.prefetch(100);\n    const result: T[] = [];\n    let x = await stream.next();\n    while (!x.done) {\n      result.push(x.value);\n      x = await stream.next();\n    }\n    return result;\n  }\n\n  /**\n   * Draw items from the stream until it is exhausted.\n   *\n   * This can be useful when the stream has side effects but no output.  In\n   * that case, calling this function guarantees that the stream will be\n   * fully processed.\n   */\n  async resolveFully(): Promise<void> {\n    let x = await this.next();\n    while (!x.done) {\n      x = await this.next();\n    }\n  }\n\n  /**\n   * Draw items from the stream until it is exhausted, or a predicate fails.\n   *\n   * This can be useful when the stream has side effects but no output.  In\n   * that case, calling this function guarantees that the stream will be\n   * fully processed.\n   */\n  async resolveWhile(predicate: (r: T) => boolean): Promise<void> {\n    let x = await this.next();\n    let shouldContinue = predicate(x.value);\n    while ((!x.done) && shouldContinue) {\n      x = await this.next();\n      shouldContinue = predicate(x.value);\n    }\n  }\n\n  /**\n   * Handles errors thrown on this stream using a provided handler function.\n   *\n   * @param handler A function that handles any `Error` thrown during a `next()`\n   *   call and returns true if the stream should continue (dropping the failed\n   *   call) or false if the stream should quietly terminate.  If the handler\n   *   itself throws (or rethrows) an `Error`, that will be propagated.\n   *\n   * @returns A `LazyIterator` of elements passed through from upstream,\n   *   possibly filtering or terminating on upstream `next()` calls that\n   *   throw an `Error`.\n   */\n  handleErrors(handler: (error: Error) => boolean): LazyIterator<T> {\n    return new ErrorHandlingLazyIterator(this, handler);\n  }\n\n  // TODO(soergel): Implement reduce() etc.\n\n  /**\n   * Filters this stream according to `predicate`.\n   *\n   * @param predicate A function mapping a stream element to a boolean or a\n   * `Promise` for one.\n   *\n   * @returns A `LazyIterator` of elements for which the predicate was true.\n   */\n  filter(predicate: (value: T) => boolean): LazyIterator<T> {\n    return new FilterIterator(this, predicate);\n  }\n\n  /**\n   * Maps this stream through a 1-to-1 transform.\n   *\n   * @param transform A function mapping a stream element to a transformed\n   *   element.\n   *\n   * @returns A `LazyIterator` of transformed elements.\n   */\n  map<O>(transform: (value: T) => O): LazyIterator<O> {\n    return new MapIterator(this, transform);\n  }\n\n  /**\n   * Maps this stream through an async 1-to-1 transform.\n   *\n   * @param transform A function mapping a stream element to a `Promise` for a\n   *   transformed stream element.\n   *\n   * @returns A `LazyIterator` of transformed elements.\n   */\n  mapAsync<O>(transform: (value: T) => Promise<O>): LazyIterator<O> {\n    return new AsyncMapIterator(this, transform);\n  }\n\n  /**\n   * Maps this stream through a 1-to-1 transform, forcing serial execution.\n   *\n   * @param transform A function mapping a stream element to a transformed\n   *   element.\n   *\n   * @returns A `LazyIterator` of transformed elements.\n   */\n  serialMapAsync<O>(transform: (value: T) => Promise<O>): LazyIterator<O> {\n    return new AsyncMapIterator(this, transform).serial();\n  }\n\n  /**\n   * Maps this stream through a 1-to-many transform.\n   *\n   * @param transform A function mapping a stream element to an array of\n   *   transformed elements.\n   *\n   * @returns A `DataStream` of transformed elements.\n   */\n  flatmap<O>(transform: (value: T) => O[]): LazyIterator<O> {\n    return new FlatmapIterator(this, transform);\n  }\n\n  /**\n   * Apply a function to every element of the stream.\n   *\n   * @param f A function to apply to each stream element.\n   */\n  async forEachAsync(f: (value: T) => void): Promise<void> {\n    return this.map(f).resolveFully();\n  }\n\n  /**\n   * Apply a function to every element of the stream, forcing serial execution.\n   *\n   * @param f A function to apply to each stream element.  Should return 'true'\n   *   to indicate that the stream should continue, or 'false' to cause it to\n   *   terminate.\n   */\n  async serialForEach(f: (value: T) => Promise<boolean>): Promise<void> {\n    return this.serialMapAsync(f).resolveWhile(x => (x === true));\n  }\n\n  /**\n   * Groups elements into batches, represented as arrays of elements.\n   *\n   * We can think of the elements of this iterator as 'rows' (even if they are\n   * nested structures).  By the same token, consecutive values for a given\n   * key within the elements form a 'column'.  This matches the usual sense of\n   * 'row' and 'column' when processing tabular data (e.g., parsing a CSV).\n   *\n   * Thus, \"Row-major\" means that the resulting batch is simply a collection of\n   * rows: `[row1, row2, row3, ...]`.  This is contrast to the column-major\n   * form, which is needed for vectorized computation.\n   *\n   * @param batchSize The number of elements desired per batch.\n   * @param smallLastBatch Whether to emit the final batch when it has fewer\n   *   than batchSize elements. Default true.\n   * @returns A `LazyIterator` of batches of elements, represented as arrays\n   *   of the original element type.\n   */\n  rowMajorBatch(batchSize: number, smallLastBatch = true): LazyIterator<T[]> {\n    return new RowMajorBatchIterator(this, batchSize, smallLastBatch);\n  }\n\n  /**\n   * Groups elements into batches, represented in column-major form.\n   *\n   * We can think of the elements of this iterator as 'rows' (even if they are\n   * nested structures).  By the same token, consecutive values for a given\n   * key within the elements form a 'column'.  This matches the usual sense of\n   * 'row' and 'column' when processing tabular data (e.g., parsing a CSV).\n   *\n   * Thus, \"column-major\" means that the resulting batch is a (potentially\n   * nested) structure representing the columns.  Each column entry, then,\n   * contains a collection of the values found in that column for a range of\n   * input elements.  This representation allows for vectorized computation, in\n   * contrast to the row-major form.\n   *\n   * The inputs should all have the same nested structure (i.e., of arrays and\n   * dicts).  The result is a single object with the same nested structure,\n   * where the leaves are arrays collecting the values of the inputs at that\n   * location (or, optionally, the result of a custom function applied to those\n   * arrays).\n   *\n   * @param batchSize The number of elements desired per batch.\n   * @param smallLastBatch Whether to emit the final batch when it has fewer\n   *   than batchSize elements. Default true.\n   * @param zipFn: (optional) A function that expects an array of elements at a\n   *   single node of the object tree, and returns a `DeepMapResult`.  The\n   *   `DeepMapResult` either provides a result value for that node (i.e.,\n   *   representing the subtree), or indicates that the node should be processed\n   *   recursively.  The default zipFn recurses as far as possible and places\n   *   arrays at the leaves.\n   * @returns A `LazyIterator` of batches of elements, represented as an object\n   *   with collections at the leaves.\n   */\n  columnMajorBatch(\n      batchSize: number, smallLastBatch = true,\n      // tslint:disable-next-line:no-any\n      zipFn: (xs: any[]) => DeepMapResult = zipToList):\n      LazyIterator<tf.TensorContainer> {\n    // First collect the desired number of input elements as a row-major batch.\n    const rowBatches = this.rowMajorBatch(batchSize, smallLastBatch);\n    // Now 'rotate' or 'pivot' the data, collecting all values from each column\n    // in the batch (i.e., for each key within the elements) into an array.\n    return rowBatches.map(x => deepZip(x, zipFn));\n  }\n\n  /**\n   * Concatenate this `LazyIterator` with another.\n   *\n   * @param iterator A `LazyIterator` to be concatenated onto this one.\n   * @param baseErrorHandler An optional function that can intercept `Error`s\n   *   raised during a `next()` call on the base stream.  This function can\n   *   decide whether the error should be propagated, whether the error should\n   *   be ignored, or whether the base stream should be terminated.\n   * @returns A `LazyIterator`.\n   */\n  concatenate(\n      iterator: LazyIterator<T>,\n      baseErrorHandler?: (e: Error) => boolean): LazyIterator<T> {\n    return new ChainedIterator(\n        iteratorFromItems([this, iterator]), baseErrorHandler);\n  }\n\n  /**\n   * Limits this stream to return at most `count` items.\n   *\n   * @param count The maximum number of items to provide from the stream. If\n   * a negative or undefined value is given, the entire stream is returned\n   *   unaltered.\n   */\n  take(count: number): LazyIterator<T> {\n    if (count < 0 || count == null) {\n      return this;\n    }\n    return new TakeIterator(this, count);\n  }\n\n  /**\n   * Skips the first `count` items in this stream.\n   *\n   * @param count The number of items to skip.  If a negative or undefined\n   * value is given, the entire stream is returned unaltered.\n   */\n  skip(count: number): LazyIterator<T> {\n    if (count < 0 || count == null) {\n      return this;\n    }\n    return new SkipIterator(this, count);\n  }\n\n  /**\n   * Prefetch the first `bufferSize` items in this stream.\n   *\n   * Note this prefetches Promises, but makes no guarantees about when those\n   * Promises resolve.\n   *\n   * @param bufferSize: An integer specifying the number of elements to be\n   *   prefetched.\n   */\n  prefetch(bufferSize: number): LazyIterator<T> {\n    return new PrefetchIterator(this, bufferSize);\n  }\n\n  // TODO(soergel): deep sharded shuffle, where supported\n\n  /**\n   * Randomly shuffles the elements of this stream.\n   *\n   * @param bufferSize: An integer specifying the number of elements from\n   * this stream from which the new stream will sample.\n   * @param seed: (Optional.) An integer specifying the random seed that\n   * will be used to create the distribution.\n   */\n  shuffle(windowSize: number, seed?: string): LazyIterator<T> {\n    return new ShuffleIterator(this, windowSize, seed);\n  }\n\n  /**\n   * Force an iterator to execute serially: each next() call will await the\n   * prior one, so that they cannot execute concurrently.\n   */\n  serial(): LazyIterator<T> {\n    return new SerialIterator(this);\n  }\n}\n\n// ============================================================================\n// The following private classes serve to implement the chainable methods\n// on LazyIterator.  Unfortunately they can't be placed in separate files,\n// due to resulting trouble with circular imports.\n// ============================================================================\n\n// Iterators that just extend LazyIterator directly\n// ============================================================================\n\nclass ArrayIterator<T> extends LazyIterator<T> {\n  private trav = 0;\n  constructor(protected items: T[]) {\n    super();\n  }\n\n  summary() {\n    return `Array of ${this.items.length} items`;\n  }\n\n  async next(): Promise<IteratorResult<T>> {\n    if (this.trav >= this.items.length) {\n      return {value: null, done: true};\n    }\n    const item = this.items[this.trav];\n    this.trav++;\n    return {value: deepClone(item), done: false};\n  }\n}\n\nclass FunctionCallIterator<T> extends LazyIterator<T> {\n  constructor(\n      protected nextFn: () => IteratorResult<T>| Promise<IteratorResult<T>>) {\n    super();\n  }\n\n  summary() {\n    return `Function call`;\n  }\n\n  async next(): Promise<IteratorResult<T>> {\n    try {\n      return this.nextFn();\n    } catch (e) {\n      // Modify the error message but leave the stack trace intact\n      e.message =\n          `Error thrown while iterating through a dataset: ${e.message}`;\n      throw e;\n    }\n  }\n}\n\nclass SerialIterator<T> extends LazyIterator<T> {\n  // Strict Promise execution order:\n  // a next() call may not even begin until the previous one completes.\n  private lastRead: Promise<IteratorResult<T>>;\n\n  constructor(protected upstream: LazyIterator<T>) {\n    super();\n    this.lastRead = Promise.resolve({value: null, done: false});\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> Serial`;\n  }\n\n  async next(): Promise<IteratorResult<T>> {\n    // This sets this.lastRead to a new Promise right away, as opposed to\n    // saying `await this.lastRead; this.lastRead = this.serialNext();` which\n    // would not work because this.nextRead would be updated only after the\n    // promise resolves.\n    this.lastRead = this.lastRead.then(() => this.serialNext());\n    return this.lastRead;\n  }\n\n  private async serialNext(): Promise<IteratorResult<T>> {\n    return this.upstream.next();\n  }\n}\n\nclass SkipIterator<T> extends LazyIterator<T> {\n  // Strict Promise execution order:\n  // a next() call may not even begin until the previous one completes.\n  private lastRead: Promise<IteratorResult<T>>;\n\n  // Local state that should not be clobbered by out-of-order execution.\n  count = 0;\n\n  constructor(protected upstream: LazyIterator<T>, protected maxCount: number) {\n    super();\n    this.lastRead = Promise.resolve({value: null, done: false});\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> Skip`;\n  }\n\n  async next(): Promise<IteratorResult<T>> {\n    // This sets this.lastRead to a new Promise right away, as opposed to\n    // saying `await this.lastRead; this.lastRead = this.serialNext();` which\n    // would not work because this.nextRead would be updated only after the\n    // promise resolves.\n    this.lastRead = this.lastRead.then(() => this.serialNext());\n    return this.lastRead;\n  }\n\n  private async serialNext(): Promise<IteratorResult<T>> {\n    // TODO(soergel): consider tradeoffs of reading in parallel, eg.\n    // collecting next() promises in an Array and then waiting for\n    // Promise.all() of those. Benefit: pseudo-parallel execution.  Drawback:\n    // maybe delayed GC.\n    while (this.count++ < this.maxCount) {\n      const skipped = await this.upstream.next();\n      // short-circuit if upstream is already empty\n      if (skipped.done) {\n        return skipped;\n      }\n      tf.dispose(skipped.value as {});\n    }\n    return this.upstream.next();\n  }\n}\n\nclass TakeIterator<T> extends LazyIterator<T> {\n  count = 0;\n  constructor(protected upstream: LazyIterator<T>, protected maxCount: number) {\n    super();\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> Take`;\n  }\n\n  async next(): Promise<IteratorResult<T>> {\n    if (this.count++ >= this.maxCount) {\n      return {value: null, done: true};\n    }\n    return this.upstream.next();\n  }\n}\n\n// Note this batch just groups items into row-wise element arrays.\n// Rotating these to a column-wise representation happens only at the dataset\n// level.\nclass RowMajorBatchIterator<T> extends LazyIterator<T[]> {\n  // Strict Promise execution order:\n  // a next() call may not even begin until the previous one completes.\n  private lastRead: Promise<IteratorResult<T[]>>;\n\n  constructor(\n      protected upstream: LazyIterator<T>, protected batchSize: number,\n      protected enableSmallLastBatch = true) {\n    super();\n    this.lastRead = Promise.resolve({value: null, done: false});\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> RowMajorBatch`;\n  }\n\n  async next(): Promise<IteratorResult<T[]>> {\n    // This sets this.lastRead to a new Promise right away, as opposed to\n    // saying `await this.lastRead; this.lastRead = this.serialNext();` which\n    // would not work because this.nextRead would be updated only after the\n    // promise resolves.\n    this.lastRead = this.lastRead.then(() => this.serialNext());\n    return this.lastRead;\n  }\n\n  private async serialNext(): Promise<IteratorResult<T[]>> {\n    const batch: T[] = [];\n    while (batch.length < this.batchSize) {\n      const item = await this.upstream.next();\n      if (item.done) {\n        if (this.enableSmallLastBatch && batch.length > 0) {\n          return {value: batch, done: false};\n        }\n        return {value: null, done: true};\n      }\n      batch.push(item.value);\n    }\n    return {value: batch, done: false};\n  }\n}\n\nclass FilterIterator<T> extends LazyIterator<T> {\n  // Strict Promise execution order:\n  // a next() call may not even begin until the previous one completes.\n  private lastRead: Promise<IteratorResult<T>>;\n\n  constructor(\n      protected upstream: LazyIterator<T>,\n      protected predicate: (value: T) => boolean) {\n    super();\n    this.lastRead = Promise.resolve({value: null, done: false});\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> Filter`;\n  }\n\n  async next(): Promise<IteratorResult<T>> {\n    // This sets this.lastRead to a new Promise right away, as opposed to\n    // saying `await this.lastRead; this.lastRead = this.serialNext();` which\n    // would not work because this.nextRead would be updated only after the\n    // promise resolves.\n    this.lastRead = this.lastRead.then(() => this.serialNext());\n    return this.lastRead;\n  }\n\n  private async serialNext(): Promise<IteratorResult<T>> {\n    while (true) {\n      const item = await this.upstream.next();\n      if (item.done || this.predicate(item.value)) {\n        return item;\n      }\n      tf.dispose(item.value as {});\n    }\n  }\n}\n\nclass MapIterator<I, O> extends LazyIterator<O> {\n  constructor(\n      protected upstream: LazyIterator<I>,\n      protected transform: (value: I) => O) {\n    super();\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> Map`;\n  }\n\n  async next(): Promise<IteratorResult<O>> {\n    const item = await this.upstream.next();\n    if (item.done) {\n      return {value: null, done: true};\n    }\n    const inputTensors = tf.tensor_util.getTensorsInContainer(item.value as {});\n    // Careful: the transform may mutate the item in place.\n    // That's why we have to remember the input Tensors above, and then\n    // below dispose only those that were not passed through to the output.\n    // Note too that the transform function is responsible for tidying\n    // any intermediate Tensors.  Here we are concerned only about the\n    // inputs.\n    const mapped = this.transform(item.value);\n    const outputTensors = tf.tensor_util.getTensorsInContainer(mapped as {});\n\n    // TODO(soergel) faster intersection\n    // TODO(soergel) move to tf.disposeExcept(in, out)?\n    for (const t of inputTensors) {\n      if (!tf.tensor_util.isTensorInList(t, outputTensors)) {\n        t.dispose();\n      }\n    }\n    return {value: mapped, done: false};\n  }\n}\n\nclass ErrorHandlingLazyIterator<T> extends LazyIterator<T> {\n  count = 0;\n  constructor(\n      protected upstream: LazyIterator<T>,\n      protected handler: (error: Error) => boolean) {\n    super();\n    this.lastRead = Promise.resolve({value: null, done: false});\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> handleErrors`;\n  }\n\n  // Strict Promise execution order:\n  // a next() call may not even begin until the previous one completes.\n  private lastRead: Promise<IteratorResult<T>>;\n\n  async next(): Promise<IteratorResult<T>> {\n    // This sets this.lastRead to a new Promise right away, as opposed to\n    // saying `await this.lastRead; this.lastRead = this.serialNext();` which\n    // would not work because this.nextRead would be updated only after the\n    // promise resolves.\n    this.lastRead = this.lastRead.then(() => this.serialNext());\n    return this.lastRead;\n  }\n\n  async serialNext(): Promise<IteratorResult<T>> {\n    while (true) {\n      try {\n        return await this.upstream.next();\n      } catch (e) {\n        if (!this.handler(e)) {\n          return {value: null, done: true};\n        }\n        // If the handler returns true, loop and fetch the next upstream item.\n\n        // If the upstream iterator throws an endless stream of errors, and if\n        // the handler says to ignore them, then we loop forever here.  That is\n        // the correct behavior-- it's up to the handler to decide when to stop.\n      }\n    }\n  }\n}\n\nclass AsyncMapIterator<I, O> extends LazyIterator<O> {\n  constructor(\n      protected upstream: LazyIterator<I>,\n      protected transform: (value: I) => Promise<O>) {\n    super();\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> AsyncMap`;\n  }\n\n  async next(): Promise<IteratorResult<O>> {\n    const item = await this.upstream.next();\n    if (item.done) {\n      return {value: null, done: true};\n    }\n    const inputTensors = tf.tensor_util.getTensorsInContainer(item.value as {});\n    // Careful: the transform may mutate the item in place.\n    // That's why we have to remember the input Tensors above, and then\n    // below dispose only those that were not passed through to the output.\n    // Note too that the transform function is responsible for tidying\n    // any intermediate Tensors.  Here we are concerned only about the\n    // inputs.\n    const mapped = await this.transform(item.value);\n    const outputTensors = tf.tensor_util.getTensorsInContainer(mapped as {});\n\n    // TODO(soergel) faster intersection\n    // TODO(soergel) move to tf.disposeExcept(in, out)?\n    for (const t of inputTensors) {\n      if (!tf.tensor_util.isTensorInList(t, outputTensors)) {\n        t.dispose();\n      }\n    }\n    return {value: mapped, done: false};\n  }\n}\n\n// Iterators that maintain a queue of pending items\n// ============================================================================\n\n/**\n * A base class for transforming streams that operate by maintaining an\n * output queue of elements that are ready to return via next().  This is\n * commonly required when the transformation is 1-to-many:  A call to next()\n * may trigger a call to the underlying stream, which will produce many\n * mapped elements of this stream-- of which we need to return only one, so\n * we have to queue the rest.\n */\nexport abstract class OneToManyIterator<T> extends LazyIterator<T> {\n  // Strict Promise execution order:\n  // a next() call may not even begin until the previous one completes.\n  private lastRead: Promise<IteratorResult<T>>;\n\n  // Local state that should not be clobbered by out-of-order execution.\n  protected outputQueue: RingBuffer<T>;\n\n  constructor() {\n    super();\n    this.outputQueue = new GrowingRingBuffer<T>();\n    this.lastRead = Promise.resolve({value: null, done: false});\n  }\n\n  async next(): Promise<IteratorResult<T>> {\n    // This sets this.lastRead to a new Promise right away, as opposed to\n    // saying `await this.lastRead; this.lastRead = this.serialNext();` which\n    // would not work because this.nextRead would be updated only after the\n    // promise resolves.\n    this.lastRead = this.lastRead.then(() => this.serialNext());\n    return this.lastRead;\n  }\n\n  /**\n   * Read one or more chunks from upstream and process them, possibly\n   * reading or writing a carryover, and adding processed items to the\n   * output queue.  Note it's possible that no items are added to the queue\n   * on a given pump() call, even if the upstream stream is not closed\n   * (e.g., because items are filtered).\n   *\n   * @return `true` if any action was taken, i.e. fetching items from the\n   *   upstream source OR adding items to the output queue.  `false` if the\n   *   upstream source is exhausted AND nothing was added to the queue\n   * (i.e., any remaining carryover).\n   */\n  protected abstract pump(): Promise<boolean>;\n\n  async serialNext(): Promise<IteratorResult<T>> {\n    // Fetch so that the queue contains at least one item if possible.\n    // If the upstream source is exhausted, AND there are no items left in\n    // the output queue, then this stream is also exhausted.\n    while (this.outputQueue.length() === 0) {\n      // TODO(soergel): consider parallel reads.\n      if (!await this.pump()) {\n        return {value: null, done: true};\n      }\n    }\n    return {value: this.outputQueue.shift(), done: false};\n  }\n}\nclass FlatmapIterator<I, O> extends OneToManyIterator<O> {\n  constructor(\n      protected upstream: LazyIterator<I>,\n      protected transform: (value: I) => O[]) {\n    super();\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> Flatmap`;\n  }\n\n  async pump(): Promise<boolean> {\n    const item = await this.upstream.next();\n    if (item.done) {\n      return false;\n    }\n    const inputTensors = tf.tensor_util.getTensorsInContainer(item.value as {});\n    // Careful: the transform may mutate the item in place.\n    // that's why we have to remember the input Tensors above, and then\n    // below dispose only those that were not passed through to the output.\n    // Note too that the transform function is responsible for tidying any\n    // intermediate Tensors.  Here we are concerned only about the inputs.\n    const mappedArray = this.transform(item.value);\n    const outputTensors =\n        tf.tensor_util.getTensorsInContainer(mappedArray as {});\n    this.outputQueue.pushAll(mappedArray);\n\n    // TODO(soergel) faster intersection, and deduplicate outputTensors\n    // TODO(soergel) move to tf.disposeExcept(in, out)?\n    for (const t of inputTensors) {\n      if (!tf.tensor_util.isTensorInList(t, outputTensors)) {\n        t.dispose();\n      }\n    }\n\n    return true;\n  }\n}\n\n/**\n * Provides a `LazyIterator` that concatenates a stream of underlying\n * streams.\n *\n * Doing this in a concurrency-safe way requires some trickery.  In\n * particular, we want this stream to return the elements from the\n * underlying streams in the correct order according to when next() was\n * called, even if the resulting Promises resolve in a different order.\n */\nexport class ChainedIterator<T> extends LazyIterator<T> {\n  // Strict Promise execution order:\n  // a next() call may not even begin until the previous one completes.\n  private lastRead: Promise<IteratorResult<T>> = null;\n\n  // Local state that should not be clobbered by out-of-order execution.\n  private iterator: LazyIterator<T> = null;\n  private moreIterators: LazyIterator<LazyIterator<T>>;\n\n  constructor(\n      iterators: LazyIterator<LazyIterator<T>>,\n      private readonly baseErrorHandler?: (e: Error) => boolean) {\n    super();\n    this.moreIterators = iterators;\n  }\n\n  summary() {\n    const upstreamSummaries = 'TODO: fill in upstream of chained summaries';\n    return `${upstreamSummaries} -> Chained`;\n  }\n\n  async next(): Promise<IteratorResult<T>> {\n    this.lastRead = this.readFromChain(this.lastRead);\n    return this.lastRead;\n  }\n\n  private async readFromChain(lastRead: Promise<IteratorResult<T>>):\n      Promise<IteratorResult<T>> {\n    // Must await on the previous read since the previous read may have advanced\n    // the stream of streams, from which we need to read.\n    // This is unfortunate since we can't parallelize reads. Which means\n    // prefetching of chained streams is a no-op.\n    // One solution is to prefetch immediately upstream of this.\n    await lastRead;\n    if (this.iterator == null) {\n      const iteratorResult = await this.moreIterators.next();\n      if (iteratorResult.done) {\n        // No more streams to stream from.\n        return {value: null, done: true};\n      }\n      this.iterator = iteratorResult.value;\n      if (this.baseErrorHandler != null) {\n        this.iterator = this.iterator.handleErrors(this.baseErrorHandler);\n      }\n    }\n    const itemResult = await this.iterator.next();\n    if (itemResult.done) {\n      this.iterator = null;\n      return this.readFromChain(lastRead);\n    }\n    return itemResult;\n  }\n}\n\nexport enum ZipMismatchMode {\n  FAIL,      // require zipped streams to have the same length\n  SHORTEST,  // terminate zip when the first stream is exhausted\n  LONGEST    // use nulls for exhausted streams; use up the longest stream.\n}\n\n/**\n * Provides a `LazyIterator` that zips together an array, dict, or nested\n * structure of `LazyIterator`s (and perhaps additional constants).\n *\n * The underlying streams must provide elements in a consistent order such\n * that they correspond.\n *\n * Typically, the underlying streams should have the same number of\n * elements. If they do not, the behavior is determined by the\n * `mismatchMode` argument.\n *\n * The nested structure of the `iterators` argument determines the\n * structure of elements in the resulting iterator.\n *\n * Doing this in a concurrency-safe way requires some trickery.  In\n * particular, we want this stream to return the elements from the\n * underlying streams in the correct order according to when next() was\n * called, even if the resulting Promises resolve in a different order.\n *\n * @param iterators: An array or object containing LazyIterators at the\n * leaves.\n * @param mismatchMode: Determines what to do when one underlying iterator\n * is exhausted before the others.  `ZipMismatchMode.FAIL` (the default)\n * causes an error to be thrown in this case.  `ZipMismatchMode.SHORTEST`\n * causes the zipped iterator to terminate with the furst underlying\n * streams, so elements remaining on the longer streams are ignored.\n * `ZipMismatchMode.LONGEST` causes the zipped stream to continue, filling\n * in nulls for the exhausted streams, until all streams are exhausted.\n */\nclass ZipIterator<O extends tf.TensorContainer> extends LazyIterator<O> {\n  private count = 0;\n  private currentPromise: Promise<IteratorResult<O>> = null;\n\n  constructor(\n      protected readonly iterators: IteratorContainer,\n      protected readonly mismatchMode: ZipMismatchMode = ZipMismatchMode.FAIL) {\n    super();\n  }\n\n  summary() {\n    const upstreamSummaries = 'TODO: fill in upstream of zip summaries';\n    return `{${upstreamSummaries}} -> Zip`;\n  }\n\n  private async nextState(afterState: Promise<IteratorResult<O>>):\n      Promise<IteratorResult<O>> {\n    // This chaining ensures that the underlying next() are not even called\n    // before the previous ones have resolved.\n    await afterState;\n\n    // Collect underlying iterator \"done\" signals as a side effect in\n    // getNext()\n    let numIterators = 0;\n    let iteratorsDone = 0;\n\n    function getNext(container: IteratorContainer): DeepMapAsyncResult {\n      if (container instanceof LazyIterator) {\n        const result = container.next();\n        return {\n          value: result.then(x => {\n            numIterators++;\n            if (x.done) {\n              iteratorsDone++;\n            }\n            return x.value;\n          }),\n          recurse: false\n        };\n      } else {\n        return {value: null, recurse: true};\n      }\n    }\n\n    const mapped: O = await deepMapAndAwaitAll(this.iterators, getNext);\n\n    if (numIterators === iteratorsDone) {\n      // The streams have all ended.\n      return {value: null, done: true};\n    }\n    if (iteratorsDone > 0) {\n      switch (this.mismatchMode) {\n        case ZipMismatchMode.FAIL:\n          throw new Error(\n              'Zipped streams should have the same length. ' +\n              `Mismatched at element ${this.count}.`);\n        case ZipMismatchMode.SHORTEST:\n          return {value: null, done: true};\n        case ZipMismatchMode.LONGEST:\n        default:\n          // Continue.  The exhausted streams already produced value: null.\n      }\n    }\n\n    this.count++;\n    return {value: mapped, done: false};\n  }\n\n  async next(): Promise<IteratorResult<O>> {\n    this.currentPromise = this.nextState(this.currentPromise);\n    return this.currentPromise;\n  }\n}\n\n// Iterators that maintain a ring buffer of pending promises\n// ============================================================================\n\n/**\n * A stream that prefetches a given number of items from an upstream source,\n * returning them in FIFO order.\n *\n * Note this prefetches Promises, but makes no guarantees about when those\n * Promises resolve.\n */\nexport class PrefetchIterator<T> extends LazyIterator<T> {\n  protected buffer: RingBuffer<Promise<IteratorResult<T>>>;\n\n  constructor(\n      protected upstream: LazyIterator<T>, protected bufferSize: number) {\n    super();\n    this.buffer = new RingBuffer<Promise<IteratorResult<T>>>(bufferSize);\n  }\n\n  summary() {\n    return `${this.upstream.summary()} -> Prefetch`;\n  }\n\n  /**\n   * Refill the prefetch buffer.  Returns only after the buffer is full, or\n   * the upstream source is exhausted.\n   */\n  protected refill() {\n    while (!this.buffer.isFull()) {\n      const v = this.upstream.next();\n      this.buffer.push(v);\n    }\n  }\n\n  next(): Promise<IteratorResult<T>> {\n    this.refill();\n    // This shift will never throw an error because the buffer is always\n    // full after a refill. If the stream is exhausted, the buffer will be\n    // full of Promises that will resolve to the end-of-stream signal.\n    return this.buffer.shift();\n  }\n}\n\n/**\n * A stream that performs a sliding-window random shuffle on an upstream\n * source. This is like a `PrefetchIterator` except that the items are\n * returned in randomized order.  Mixing naturally improves as the buffer\n * size increases.\n */\nexport class ShuffleIterator<T> extends PrefetchIterator<T> {\n  private readonly random: seedrandom.prng;\n\n  // Strict Promise execution order:\n  // a next() call may not even begin until the previous one completes.\n  private lastRead: Promise<IteratorResult<T>>;\n\n  // Local state that should not be clobbered by out-of-order execution.\n  private upstreamExhausted = false;\n\n  constructor(\n    protected override upstream: LazyIterator<T>, protected windowSize: number,\n      seed?: string) {\n    super(upstream, windowSize);\n    this.random = seedrandom.alea(seed || tf.util.now().toString());\n    this.lastRead = Promise.resolve({value: null, done: false});\n  }\n\n  override async next(): Promise<IteratorResult<T>> {\n    // This sets this.lastRead to a new Promise right away, as opposed to\n    // saying `await this.lastRead; this.lastRead = this.serialNext();` which\n    // would not work because this.nextRead would be updated only after the\n    // promise resolves.\n    this.lastRead = this.lastRead.then(() => this.serialNext());\n    return this.lastRead;\n  }\n\n  private randomInt(max: number) {\n    return Math.floor(this.random() * max);\n  }\n\n  protected chooseIndex(): number {\n    return this.randomInt(this.buffer.length());\n  }\n\n  async serialNext(): Promise<IteratorResult<T>> {\n    // TODO(soergel): consider performance\n    if (!this.upstreamExhausted) {\n      this.refill();\n    }\n    while (!this.buffer.isEmpty()) {\n      const chosenIndex = this.chooseIndex();\n      const result = await this.buffer.shuffleExcise(chosenIndex);\n      if (result.done) {\n        this.upstreamExhausted = true;\n      } else {\n        this.refill();\n        return result;\n      }\n    }\n    return {value: null, done: true};\n  }\n}\n"]}