File
elementCountThreshold
|
elementCountThreshold: number
|
Type : number
|
requestByteThreshold
|
requestByteThreshold: number
|
Type : number
|
import {status} from '@grpc/grpc-js';
import {SimpleCallbackFunction} from '../apitypes';
import {GoogleError} from '../googleError';
import {warn} from '../warnings';
import {BundleDescriptor} from './bundleDescriptor';
import {computeBundleId} from './bundlingUtils';
import {Task, TaskCallback} from './task';
function noop() {}
export interface BundleOptions {
elementCountLimit: number;
requestByteLimit: number;
elementCountThreshold: number;
requestByteThreshold: number;
delayThreshold: number;
}
/**
* BundleExecutor stores several timers for each bundle (calls are bundled based
* on the options passed, each bundle has unique ID that is calculated based on
* field values). Each timer fires and sends a call after certain amount of
* time, and if a new request comes to the same bundle, the timer can be
* restarted.
*/
export class BundleExecutor {
_options: BundleOptions;
_descriptor: BundleDescriptor;
_tasks: {[index: string]: Task};
_timers: {[index: string]: NodeJS.Timer};
_invocations: {[index: string]: string};
_invocationId: number;
/**
* Organizes requests for an api service that requires to bundle them.
*
* @param {BundleOptions} bundleOptions - configures strategy this instance
* uses when executing bundled functions.
* @param {BundleDescriptor} bundleDescriptor - the description of the bundling.
* @constructor
*/
constructor(
bundleOptions: BundleOptions,
bundleDescriptor: BundleDescriptor
) {
this._options = bundleOptions;
this._descriptor = bundleDescriptor;
this._tasks = {};
this._timers = {};
this._invocations = {};
this._invocationId = 0;
}
/**
* Schedule a method call.
*
* @param {function} apiCall - the function for an API call.
* @param {Object} request - the request object to be bundled with others.
* @param {APICallback} callback - the callback to be called when the method finished.
* @return {function()} - the function to cancel the scheduled invocation.
*/
schedule(
apiCall: SimpleCallbackFunction,
request: {[index: string]: Array<{}> | string},
callback?: TaskCallback
) {
const bundleId = computeBundleId(
request,
this._descriptor.requestDiscriminatorFields
);
callback = (callback || noop) as TaskCallback;
if (bundleId === undefined) {
warn(
'bundling_schedule_bundleid_undefined',
'The request does not have enough information for request bundling. ' +
`Invoking immediately. Request: ${JSON.stringify(request)} ` +
`discriminator fields: ${this._descriptor.requestDiscriminatorFields}`
);
return apiCall(request, callback);
}
if (request[this._descriptor.bundledField] === undefined) {
warn(
'bundling_no_bundled_field',
`Request does not contain field ${
this._descriptor.bundledField
} that must present for bundling. ` +
`Invoking immediately. Request: ${JSON.stringify(request)}`
);
return apiCall(request, callback);
}
if (!(bundleId in this._tasks)) {
this._tasks[bundleId] = new Task(
apiCall,
request,
this._descriptor.bundledField,
this._descriptor.subresponseField
);
}
let task = this._tasks[bundleId];
callback.id = String(this._invocationId++);
this._invocations[callback.id] = bundleId;
const bundledField = request[this._descriptor.bundledField] as Array<{}>;
const elementCount = bundledField.length;
let requestBytes = 0;
const self = this;
bundledField.forEach(obj => {
requestBytes += this._descriptor.byteLengthFunction(obj);
});
const countLimit = this._options.elementCountLimit || 0;
const byteLimit = this._options.requestByteLimit || 0;
if (
(countLimit > 0 && elementCount > countLimit) ||
(byteLimit > 0 && requestBytes >= byteLimit)
) {
let message;
if (countLimit > 0 && elementCount > countLimit) {
message =
'The number of elements ' +
elementCount +
' exceeds the limit ' +
this._options.elementCountLimit;
} else {
message =
'The required bytes ' +
requestBytes +
' exceeds the limit ' +
this._options.requestByteLimit;
}
const error = new GoogleError(message);
error.code = status.INVALID_ARGUMENT;
callback(error);
return {
cancel: noop,
};
}
const existingCount = task.getElementCount();
const existingBytes = task.getRequestByteSize();
if (
(countLimit > 0 && elementCount + existingCount >= countLimit) ||
(byteLimit > 0 && requestBytes + existingBytes >= byteLimit)
) {
this._runNow(bundleId);
this._tasks[bundleId] = new Task(
apiCall,
request,
this._descriptor.bundledField,
this._descriptor.subresponseField
);
task = this._tasks[bundleId];
}
task.extend(bundledField, requestBytes, callback);
const ret = {
cancel() {
self._cancel(callback!.id!);
},
};
const countThreshold = this._options.elementCountThreshold || 0;
const sizeThreshold = this._options.requestByteThreshold || 0;
if (
(countThreshold > 0 && task.getElementCount() >= countThreshold) ||
(sizeThreshold > 0 && task.getRequestByteSize() >= sizeThreshold)
) {
this._runNow(bundleId);
return ret;
}
if (!(bundleId in this._timers) && this._options.delayThreshold > 0) {
this._timers[bundleId] = setTimeout(() => {
delete this._timers[bundleId];
this._runNow(bundleId);
}, this._options.delayThreshold);
}
return ret;
}
/**
* Clears scheduled timeout if it exists.
*
* @param {String} bundleId - the id for the task whose timeout needs to be
* cleared.
* @private
*/
private _maybeClearTimeout(bundleId: string) {
if (bundleId in this._timers) {
const timerId = this._timers[bundleId];
delete this._timers[bundleId];
clearTimeout(timerId);
}
}
/**
* Cancels an event.
*
* @param {String} id - The id for the event in the task.
* @private
*/
private _cancel(id: string) {
if (!(id in this._invocations)) {
return;
}
const bundleId = this._invocations[id];
if (!(bundleId in this._tasks)) {
return;
}
const task = this._tasks[bundleId];
delete this._invocations[id];
if (task.cancel(id)) {
this._maybeClearTimeout(bundleId);
delete this._tasks[bundleId];
}
}
/**
* Invokes a task.
*
* @param {String} bundleId - The id for the task.
* @private
*/
_runNow(bundleId: string) {
if (!(bundleId in this._tasks)) {
warn('bundle_runnow_bundleid_unknown', `No such bundleid: ${bundleId}`);
return;
}
this._maybeClearTimeout(bundleId);
const task = this._tasks[bundleId];
delete this._tasks[bundleId];
task.run().forEach(id => {
delete this._invocations[id];
});
}
}