File

src/bundlingCalls/bundleExecutor.ts

Index

Properties

Properties

delayThreshold
delayThreshold: number
Type : number
elementCountLimit
elementCountLimit: number
Type : number
elementCountThreshold
elementCountThreshold: number
Type : number
requestByteLimit
requestByteLimit: number
Type : number
requestByteThreshold
requestByteThreshold: number
Type : number
import {status} from '@grpc/grpc-js';

import {SimpleCallbackFunction} from '../apitypes';
import {GoogleError} from '../googleError';
import {warn} from '../warnings';

import {BundleDescriptor} from './bundleDescriptor';
import {computeBundleId} from './bundlingUtils';
import {Task, TaskCallback} from './task';

function noop() {}

export interface BundleOptions {
  elementCountLimit: number;
  requestByteLimit: number;
  elementCountThreshold: number;
  requestByteThreshold: number;
  delayThreshold: number;
}

/**
 * BundleExecutor stores several timers for each bundle (calls are bundled based
 * on the options passed, each bundle has unique ID that is calculated based on
 * field values). Each timer fires and sends a call after certain amount of
 * time, and if a new request comes to the same bundle, the timer can be
 * restarted.
 */
export class BundleExecutor {
  _options: BundleOptions;
  _descriptor: BundleDescriptor;
  _tasks: {[index: string]: Task};
  _timers: {[index: string]: NodeJS.Timer};
  _invocations: {[index: string]: string};
  _invocationId: number;
  /**
   * Organizes requests for an api service that requires to bundle them.
   *
   * @param {BundleOptions} bundleOptions - configures strategy this instance
   *   uses when executing bundled functions.
   * @param {BundleDescriptor} bundleDescriptor - the description of the bundling.
   * @constructor
   */
  constructor(
    bundleOptions: BundleOptions,
    bundleDescriptor: BundleDescriptor
  ) {
    this._options = bundleOptions;
    this._descriptor = bundleDescriptor;
    this._tasks = {};
    this._timers = {};
    this._invocations = {};
    this._invocationId = 0;
  }

  /**
   * Schedule a method call.
   *
   * @param {function} apiCall - the function for an API call.
   * @param {Object} request - the request object to be bundled with others.
   * @param {APICallback} callback - the callback to be called when the method finished.
   * @return {function()} - the function to cancel the scheduled invocation.
   */
  schedule(
    apiCall: SimpleCallbackFunction,
    request: {[index: string]: Array<{}> | string},
    callback?: TaskCallback
  ) {
    const bundleId = computeBundleId(
      request,
      this._descriptor.requestDiscriminatorFields
    );
    callback = (callback || noop) as TaskCallback;
    if (bundleId === undefined) {
      warn(
        'bundling_schedule_bundleid_undefined',
        'The request does not have enough information for request bundling. ' +
          `Invoking immediately. Request: ${JSON.stringify(request)} ` +
          `discriminator fields: ${this._descriptor.requestDiscriminatorFields}`
      );
      return apiCall(request, callback);
    }
    if (request[this._descriptor.bundledField] === undefined) {
      warn(
        'bundling_no_bundled_field',
        `Request does not contain field ${
          this._descriptor.bundledField
        } that must present for bundling. ` +
          `Invoking immediately. Request: ${JSON.stringify(request)}`
      );
      return apiCall(request, callback);
    }

    if (!(bundleId in this._tasks)) {
      this._tasks[bundleId] = new Task(
        apiCall,
        request,
        this._descriptor.bundledField,
        this._descriptor.subresponseField
      );
    }
    let task = this._tasks[bundleId];
    callback.id = String(this._invocationId++);
    this._invocations[callback.id] = bundleId;

    const bundledField = request[this._descriptor.bundledField] as Array<{}>;
    const elementCount = bundledField.length;
    let requestBytes = 0;
    const self = this;
    bundledField.forEach(obj => {
      requestBytes += this._descriptor.byteLengthFunction(obj);
    });

    const countLimit = this._options.elementCountLimit || 0;
    const byteLimit = this._options.requestByteLimit || 0;

    if (
      (countLimit > 0 && elementCount > countLimit) ||
      (byteLimit > 0 && requestBytes >= byteLimit)
    ) {
      let message;
      if (countLimit > 0 && elementCount > countLimit) {
        message =
          'The number of elements ' +
          elementCount +
          ' exceeds the limit ' +
          this._options.elementCountLimit;
      } else {
        message =
          'The required bytes ' +
          requestBytes +
          ' exceeds the limit ' +
          this._options.requestByteLimit;
      }
      const error = new GoogleError(message);
      error.code = status.INVALID_ARGUMENT;
      callback(error);
      return {
        cancel: noop,
      };
    }

    const existingCount = task.getElementCount();
    const existingBytes = task.getRequestByteSize();

    if (
      (countLimit > 0 && elementCount + existingCount >= countLimit) ||
      (byteLimit > 0 && requestBytes + existingBytes >= byteLimit)
    ) {
      this._runNow(bundleId);
      this._tasks[bundleId] = new Task(
        apiCall,
        request,
        this._descriptor.bundledField,
        this._descriptor.subresponseField
      );
      task = this._tasks[bundleId];
    }

    task.extend(bundledField, requestBytes, callback);
    const ret = {
      cancel() {
        self._cancel(callback!.id!);
      },
    };

    const countThreshold = this._options.elementCountThreshold || 0;
    const sizeThreshold = this._options.requestByteThreshold || 0;
    if (
      (countThreshold > 0 && task.getElementCount() >= countThreshold) ||
      (sizeThreshold > 0 && task.getRequestByteSize() >= sizeThreshold)
    ) {
      this._runNow(bundleId);
      return ret;
    }

    if (!(bundleId in this._timers) && this._options.delayThreshold > 0) {
      this._timers[bundleId] = setTimeout(() => {
        delete this._timers[bundleId];
        this._runNow(bundleId);
      }, this._options.delayThreshold);
    }

    return ret;
  }

  /**
   * Clears scheduled timeout if it exists.
   *
   * @param {String} bundleId - the id for the task whose timeout needs to be
   *   cleared.
   * @private
   */
  private _maybeClearTimeout(bundleId: string) {
    if (bundleId in this._timers) {
      const timerId = this._timers[bundleId];
      delete this._timers[bundleId];
      clearTimeout(timerId);
    }
  }

  /**
   * Cancels an event.
   *
   * @param {String} id - The id for the event in the task.
   * @private
   */
  private _cancel(id: string) {
    if (!(id in this._invocations)) {
      return;
    }
    const bundleId = this._invocations[id];
    if (!(bundleId in this._tasks)) {
      return;
    }

    const task = this._tasks[bundleId];
    delete this._invocations[id];
    if (task.cancel(id)) {
      this._maybeClearTimeout(bundleId);
      delete this._tasks[bundleId];
    }
  }

  /**
   * Invokes a task.
   *
   * @param {String} bundleId - The id for the task.
   * @private
   */
  _runNow(bundleId: string) {
    if (!(bundleId in this._tasks)) {
      warn('bundle_runnow_bundleid_unknown', `No such bundleid: ${bundleId}`);
      return;
    }
    this._maybeClearTimeout(bundleId);
    const task = this._tasks[bundleId];
    delete this._tasks[bundleId];

    task.run().forEach(id => {
      delete this._invocations[id];
    });
  }
}

result-matching ""

    No results matching ""