File

src/bundlingCalls/task.ts

import {status} from '@grpc/grpc-js';

import {APICallback, GRPCCallResult, SimpleCallbackFunction} from '../apitypes';
import {GoogleError} from '../googleError';

export interface SubResponseInfo {
  field: string;
  start?: number;
  end?: number;
}

export interface TaskElement {}

export interface TaskData {
  elements: TaskElement[];
  bytes: number;
  callback: TaskCallback;
  cancelled?: boolean;
}

export interface TaskCallback extends APICallback {
  id?: string;
}

/**
 * Creates a deep copy of the object with the consideration of subresponse
 * fields for bundling.
 *
 * @param {Object} obj - The source object.
 * @param {Object?} subresponseInfo - The information to copy the subset of
 *   the field for the response. Do nothing if it's null.
 * @param {String} subresponseInfo.field - The field name.
 * @param {number} subresponseInfo.start - The offset where the copying
 *   element should starts with.
 * @param {number} subresponseInfo.end - The ending index where the copying
 *   region of the elements ends.
 * @return {Object} The copied object.
 * @private
 */
export function deepCopyForResponse(
  // tslint:disable-next-line no-any
  obj: any,
  subresponseInfo: SubResponseInfo | null
) {
  // tslint:disable-next-line no-any
  let result: any;
  if (obj === null) {
    return null;
  }
  if (obj === undefined) {
    return undefined;
  }
  if (Array.isArray(obj)) {
    result = [];
    obj.forEach(element => {
      result.push(deepCopyForResponse(element, null));
    });
    return result;
  }
  // Some objects (such as ByteBuffer) have copy method.
  if (obj.copy !== undefined) {
    return obj.copy();
  }
  // ArrayBuffer should be copied through slice().
  if (obj instanceof ArrayBuffer) {
    return (obj as ArrayBuffer).slice(0);
  }
  if (typeof obj === 'object') {
    result = {};
    Object.keys(obj).forEach(key => {
      if (
        subresponseInfo &&
        key === subresponseInfo.field &&
        Array.isArray(obj[key])
      ) {
        // Note that subresponses are not deep-copied. This is safe because
        // those subresponses are not shared among callbacks.
        result[key] = obj[key].slice(
          subresponseInfo.start,
          subresponseInfo.end
        );
      } else {
        result[key] = deepCopyForResponse(obj[key], null);
      }
    });
    return result;
  }
  return obj;
}

export class Task {
  _apiCall: SimpleCallbackFunction;
  _request: {[index: string]: TaskElement[]};
  _bundledField: string;
  _subresponseField?: string | null;
  _data: TaskData[];
  callCanceller?: GRPCCallResult;
  /**
   * A task coordinates the execution of a single bundle.
   *
   * @param {function} apiCall - The function to conduct calling API.
   * @param {Object} bundlingRequest - The base request object to be used
   *   for the actual API call.
   * @param {string} bundledField - The name of the field in bundlingRequest
   *   to be bundled.
   * @param {string=} subresponseField - The name of the field in the response
   *   to be passed to the callback.
   * @constructor
   * @private
   */
  constructor(
    apiCall: SimpleCallbackFunction,
    bundlingRequest: {},
    bundledField: string,
    subresponseField?: string | null
  ) {
    this._apiCall = apiCall;
    this._request = bundlingRequest;
    this._bundledField = bundledField;
    this._subresponseField = subresponseField;
    this._data = [];
  }
  /**
   * Returns the number of elements in a task.
   * @return {number} The number of elements.
   */
  getElementCount() {
    let count = 0;
    for (let i = 0; i < this._data.length; ++i) {
      count += this._data[i].elements.length;
    }
    return count;
  }
  /**
   * Returns the total byte size of the elements in a task.
   * @return {number} The byte size.
   */
  getRequestByteSize() {
    let size = 0;
    for (let i = 0; i < this._data.length; ++i) {
      size += this._data[i].bytes;
    }
    return size;
  }
  /**
   * Invokes the actual API call with current elements.
   * @return {string[]} - the list of ids for invocations to be run.
   */
  run() {
    if (this._data.length === 0) {
      return [];
    }
    const request = this._request;
    const elements: TaskElement[] = [];
    const ids: string[] = [];
    for (let i = 0; i < this._data.length; ++i) {
      elements.push.apply(elements, this._data[i].elements);
      ids.push(this._data[i].callback.id!);
    }
    request[this._bundledField] = elements;
    const self = this;
    this.callCanceller = this._apiCall(
      request,
      (err: GoogleError | null, response?: {} | null) => {
        const responses: Array<{} | undefined> = [];
        if (err) {
          self._data.forEach(() => {
            responses.push(undefined);
          });
        } else {
          let subresponseInfo: SubResponseInfo | null = null;
          if (self._subresponseField) {
            subresponseInfo = {
              field: self._subresponseField,
              start: 0,
            };
          }
          self._data.forEach(data => {
            if (subresponseInfo) {
              subresponseInfo.end =
                subresponseInfo.start! + data.elements.length;
            }
            responses.push(deepCopyForResponse(response, subresponseInfo));
            if (subresponseInfo) {
              subresponseInfo.start = subresponseInfo.end;
            }
          });
        }
        for (let i = 0; i < self._data.length; ++i) {
          if (self._data[i].cancelled) {
            const error = new GoogleError('cancelled');
            error.code = status.CANCELLED;
            self._data[i].callback(error);
          } else {
            self._data[i].callback(err, responses[i]);
          }
        }
      }
    );
    return ids;
  }
  /**
   * Appends the list of elements into the task.
   * @param {Object[]} elements - the new list of elements.
   * @param {number} bytes - the byte size required to encode elements in the API.
   * @param {APICallback} callback - the callback of the method call.
   */
  extend(elements: TaskElement[], bytes: number, callback: TaskCallback) {
    this._data.push({
      elements,
      bytes,
      callback,
    });
  }
  /**
   * Cancels a part of elements.
   * @param {string} id - The identifier of the part of elements.
   * @return {boolean} Whether the entire task will be canceled or not.
   */
  cancel(id: string) {
    if (this.callCanceller) {
      let allCancelled = true;
      this._data.forEach(d => {
        if (d.callback.id === id) {
          d.cancelled = true;
        }
        if (!d.cancelled) {
          allCancelled = false;
        }
      });
      if (allCancelled) {
        this.callCanceller.cancel();
      }
      return allCancelled;
    }
    for (let i = 0; i < this._data.length; ++i) {
      if (this._data[i].callback.id === id) {
        const error = new GoogleError('cancelled');
        error.code = status.CANCELLED;
        this._data[i].callback(error);
        this._data.splice(i, 1);
        break;
      }
    }
    return this._data.length === 0;
  }
}

result-matching ""

    No results matching ""