src/bundlingCalls/bundleExecutor.ts
BundleExecutor stores several timers for each bundle (calls are bundled based on the options passed, each bundle has unique ID that is calculated based on field values). Each timer fires and sends a call after certain amount of time, and if a new request comes to the same bundle, the timer can be restarted.
Properties |
Methods |
constructor(bundleOptions: BundleOptions, bundleDescriptor: BundleDescriptor)
|
||||||||||||
Defined in src/bundlingCalls/bundleExecutor.ts:65
|
||||||||||||
Organizes requests for an api service that requires to bundle them.
Parameters :
|
_descriptor |
Type : BundleDescriptor
|
Defined in src/bundlingCalls/bundleExecutor.ts:61
|
_invocationId |
Type : number
|
Defined in src/bundlingCalls/bundleExecutor.ts:65
|
_invocations |
Type : literal type
|
Defined in src/bundlingCalls/bundleExecutor.ts:64
|
_options |
Type : BundleOptions
|
Defined in src/bundlingCalls/bundleExecutor.ts:60
|
_tasks |
Type : literal type
|
Defined in src/bundlingCalls/bundleExecutor.ts:62
|
_timers |
Type : literal type
|
Defined in src/bundlingCalls/bundleExecutor.ts:63
|
_runNow | ||||||||
_runNow(bundleId: string)
|
||||||||
Defined in src/bundlingCalls/bundleExecutor.ts:261
|
||||||||
Invokes a task.
Parameters :
Returns :
void
|
schedule | ||||||||||||||||
schedule(apiCall: SimpleCallbackFunction, request: literal type, callback?: TaskCallback)
|
||||||||||||||||
Defined in src/bundlingCalls/bundleExecutor.ts:94
|
||||||||||||||||
Schedule a method call.
Parameters :
Returns :
any
|
import {status} from '@grpc/grpc-js';
import {SimpleCallbackFunction} from '../apitypes';
import {GoogleError} from '../googleError';
import {warn} from '../warnings';
import {BundleDescriptor} from './bundleDescriptor';
import {computeBundleId} from './bundlingUtils';
import {Task, TaskCallback} from './task';
function noop() {}
export interface BundleOptions {
elementCountLimit: number;
requestByteLimit: number;
elementCountThreshold: number;
requestByteThreshold: number;
delayThreshold: number;
}
/**
* BundleExecutor stores several timers for each bundle (calls are bundled based
* on the options passed, each bundle has unique ID that is calculated based on
* field values). Each timer fires and sends a call after certain amount of
* time, and if a new request comes to the same bundle, the timer can be
* restarted.
*/
export class BundleExecutor {
_options: BundleOptions;
_descriptor: BundleDescriptor;
_tasks: {[index: string]: Task};
_timers: {[index: string]: NodeJS.Timer};
_invocations: {[index: string]: string};
_invocationId: number;
/**
* Organizes requests for an api service that requires to bundle them.
*
* @param {BundleOptions} bundleOptions - configures strategy this instance
* uses when executing bundled functions.
* @param {BundleDescriptor} bundleDescriptor - the description of the bundling.
* @constructor
*/
constructor(
bundleOptions: BundleOptions,
bundleDescriptor: BundleDescriptor
) {
this._options = bundleOptions;
this._descriptor = bundleDescriptor;
this._tasks = {};
this._timers = {};
this._invocations = {};
this._invocationId = 0;
}
/**
* Schedule a method call.
*
* @param {function} apiCall - the function for an API call.
* @param {Object} request - the request object to be bundled with others.
* @param {APICallback} callback - the callback to be called when the method finished.
* @return {function()} - the function to cancel the scheduled invocation.
*/
schedule(
apiCall: SimpleCallbackFunction,
request: {[index: string]: Array<{}> | string},
callback?: TaskCallback
) {
const bundleId = computeBundleId(
request,
this._descriptor.requestDiscriminatorFields
);
callback = (callback || noop) as TaskCallback;
if (bundleId === undefined) {
warn(
'bundling_schedule_bundleid_undefined',
'The request does not have enough information for request bundling. ' +
`Invoking immediately. Request: ${JSON.stringify(request)} ` +
`discriminator fields: ${this._descriptor.requestDiscriminatorFields}`
);
return apiCall(request, callback);
}
if (request[this._descriptor.bundledField] === undefined) {
warn(
'bundling_no_bundled_field',
`Request does not contain field ${
this._descriptor.bundledField
} that must present for bundling. ` +
`Invoking immediately. Request: ${JSON.stringify(request)}`
);
return apiCall(request, callback);
}
if (!(bundleId in this._tasks)) {
this._tasks[bundleId] = new Task(
apiCall,
request,
this._descriptor.bundledField,
this._descriptor.subresponseField
);
}
let task = this._tasks[bundleId];
callback.id = String(this._invocationId++);
this._invocations[callback.id] = bundleId;
const bundledField = request[this._descriptor.bundledField] as Array<{}>;
const elementCount = bundledField.length;
let requestBytes = 0;
const self = this;
bundledField.forEach(obj => {
requestBytes += this._descriptor.byteLengthFunction(obj);
});
const countLimit = this._options.elementCountLimit || 0;
const byteLimit = this._options.requestByteLimit || 0;
if (
(countLimit > 0 && elementCount > countLimit) ||
(byteLimit > 0 && requestBytes >= byteLimit)
) {
let message;
if (countLimit > 0 && elementCount > countLimit) {
message =
'The number of elements ' +
elementCount +
' exceeds the limit ' +
this._options.elementCountLimit;
} else {
message =
'The required bytes ' +
requestBytes +
' exceeds the limit ' +
this._options.requestByteLimit;
}
const error = new GoogleError(message);
error.code = status.INVALID_ARGUMENT;
callback(error);
return {
cancel: noop,
};
}
const existingCount = task.getElementCount();
const existingBytes = task.getRequestByteSize();
if (
(countLimit > 0 && elementCount + existingCount >= countLimit) ||
(byteLimit > 0 && requestBytes + existingBytes >= byteLimit)
) {
this._runNow(bundleId);
this._tasks[bundleId] = new Task(
apiCall,
request,
this._descriptor.bundledField,
this._descriptor.subresponseField
);
task = this._tasks[bundleId];
}
task.extend(bundledField, requestBytes, callback);
const ret = {
cancel() {
self._cancel(callback!.id!);
},
};
const countThreshold = this._options.elementCountThreshold || 0;
const sizeThreshold = this._options.requestByteThreshold || 0;
if (
(countThreshold > 0 && task.getElementCount() >= countThreshold) ||
(sizeThreshold > 0 && task.getRequestByteSize() >= sizeThreshold)
) {
this._runNow(bundleId);
return ret;
}
if (!(bundleId in this._timers) && this._options.delayThreshold > 0) {
this._timers[bundleId] = setTimeout(() => {
delete this._timers[bundleId];
this._runNow(bundleId);
}, this._options.delayThreshold);
}
return ret;
}
/**
* Clears scheduled timeout if it exists.
*
* @param {String} bundleId - the id for the task whose timeout needs to be
* cleared.
* @private
*/
private _maybeClearTimeout(bundleId: string) {
if (bundleId in this._timers) {
const timerId = this._timers[bundleId];
delete this._timers[bundleId];
clearTimeout(timerId);
}
}
/**
* Cancels an event.
*
* @param {String} id - The id for the event in the task.
* @private
*/
private _cancel(id: string) {
if (!(id in this._invocations)) {
return;
}
const bundleId = this._invocations[id];
if (!(bundleId in this._tasks)) {
return;
}
const task = this._tasks[bundleId];
delete this._invocations[id];
if (task.cancel(id)) {
this._maybeClearTimeout(bundleId);
delete this._tasks[bundleId];
}
}
/**
* Invokes a task.
*
* @param {String} bundleId - The id for the task.
* @private
*/
_runNow(bundleId: string) {
if (!(bundleId in this._tasks)) {
warn('bundle_runnow_bundleid_unknown', `No such bundleid: ${bundleId}`);
return;
}
this._maybeClearTimeout(bundleId);
const task = this._tasks[bundleId];
delete this._tasks[bundleId];
task.run().forEach(id => {
delete this._invocations[id];
});
}
}