import {EventEmitter} from 'events';
import {status} from '@grpc/grpc-js';
import {GaxCallPromise, ResultTuple} from '../apitypes';
import {CancellablePromise} from '../call';
import {BackoffSettings, CallOptions} from '../gax';
import {GoogleError} from '../googleError';
import {Metadata, MetadataValue} from '../grpc';
import {LongRunningDescriptor} from './longRunningDescriptor';
export interface GetOperationCallback {
(
err?: Error | null,
result?: {},
metadata?: {},
rawResponse?: Operation
): void;
}
export class Operation extends EventEmitter {
completeListeners: number;
hasActiveListeners: boolean;
latestResponse: Operation;
longrunningDescriptor: LongRunningDescriptor;
result: {} | null;
metadata: Metadata | null;
backoffSettings: BackoffSettings;
_callOptions?: CallOptions;
currentCallPromise_?: CancellablePromise<ResultTuple>;
name?: string;
done?: boolean;
error?: GoogleError;
response?: {value: {}};
constructor(
grpcOp: Operation,
longrunningDescriptor: LongRunningDescriptor,
backoffSettings: BackoffSettings,
callOptions?: CallOptions
) {
super();
this.completeListeners = 0;
this.hasActiveListeners = false;
this.latestResponse = grpcOp;
this.longrunningDescriptor = longrunningDescriptor;
this.result = null;
this.metadata = null;
this.backoffSettings = backoffSettings;
this._unpackResponse(grpcOp);
this._listenForEvents();
this._callOptions = callOptions;
}
_listenForEvents() {
this.on('newListener', event => {
if (event === 'complete') {
this.completeListeners++;
if (!this.hasActiveListeners) {
this.hasActiveListeners = true;
this.startPolling_();
}
}
});
this.on('removeListener', event => {
if (event === 'complete' && --this.completeListeners === 0) {
this.hasActiveListeners = false;
}
});
}
cancel() {
if (this.currentCallPromise_) {
this.currentCallPromise_.cancel();
}
const operationsClient = this.longrunningDescriptor.operationsClient;
return operationsClient.cancelOperation({
name: this.latestResponse.name,
}) as CancellablePromise<ResultTuple>;
}
getOperation(): Promise<{}>;
getOperation(callback: GetOperationCallback): void;
getOperation(callback?: GetOperationCallback): Promise<{}> | void {
const self = this;
const operationsClient = this.longrunningDescriptor.operationsClient;
function promisifyResponse() {
if (!callback) {
const PromiseCtor = self._callOptions!.promise!;
return new PromiseCtor((resolve, reject) => {
if (self.latestResponse.error) {
const error = new GoogleError(self.latestResponse.error.message);
error.code = self.latestResponse.error.code;
reject(error);
} else {
resolve([self.result, self.metadata, self.latestResponse]);
}
});
}
return;
}
if (this.latestResponse.done) {
this._unpackResponse(this.latestResponse, callback);
return promisifyResponse();
}
this.currentCallPromise_ = (operationsClient.getOperation as GaxCallPromise)(
{name: this.latestResponse.name},
this._callOptions!
);
const noCallbackPromise = this.currentCallPromise_!.then(responses => {
self.latestResponse = responses[0] as Operation;
self._unpackResponse(responses[0] as Operation, callback);
return promisifyResponse()!;
});
if (!callback) {
return noCallbackPromise;
}
}
_unpackResponse(op: Operation, callback?: GetOperationCallback) {
const responseDecoder = this.longrunningDescriptor.responseDecoder;
const metadataDecoder = this.longrunningDescriptor.metadataDecoder;
let response: {};
let metadata: Metadata;
if (op.done) {
if (op.result === 'error') {
const error = new GoogleError(op.error!.message);
error.code = op.error!.code;
if (callback) {
callback(error);
}
return;
}
if (responseDecoder && op.response) {
response = responseDecoder(op.response.value);
this.result = response;
}
}
if (metadataDecoder && op.metadata) {
metadata = metadataDecoder(op.metadata.value);
this.metadata = metadata;
}
if (callback) {
callback(null, response!, metadata!, op);
}
}
startPolling_() {
const self = this;
let now = new Date();
const delayMult = this.backoffSettings.retryDelayMultiplier;
const maxDelay = this.backoffSettings.maxRetryDelayMillis;
let delay = this.backoffSettings.initialRetryDelayMillis;
let deadline = Infinity;
if (this.backoffSettings.totalTimeoutMillis) {
deadline = now.getTime() + this.backoffSettings.totalTimeoutMillis;
}
let previousMetadataBytes: MetadataValue;
if (this.latestResponse.metadata) {
previousMetadataBytes = this.latestResponse.metadata.value;
}
function emit(event: string | symbol, ...args: any[]) {
self.emit(event, ...args);
}
function retry() {
if (!self.hasActiveListeners) {
return;
}
if (now.getTime() >= deadline) {
const error = new GoogleError(
'Total timeout exceeded before any response was received'
);
error.code = status.DEADLINE_EXCEEDED;
setImmediate(emit, 'error', error);
return;
}
self.getOperation((err, result, metadata, rawResponse) => {
if (err) {
setImmediate(emit, 'error', err);
return;
}
if (!result) {
if (
rawResponse!.metadata &&
(!previousMetadataBytes ||
!rawResponse!.metadata!.value.equals(previousMetadataBytes))
) {
setImmediate(emit, 'progress', metadata, rawResponse);
previousMetadataBytes = rawResponse!.metadata!.value;
}
if (rawResponse!.done) {
const error = new GoogleError(
'Long running operation has finished but there was no result'
);
error.code = status.UNKNOWN;
setImmediate(emit, 'error', error);
return;
}
setTimeout(() => {
now = new Date();
delay = Math.min(delay * delayMult, maxDelay);
retry();
}, delay);
return;
}
setImmediate(emit, 'complete', result, metadata, rawResponse);
});
}
retry();
}
promise() {
const PromiseCtor = this._callOptions!.promise!;
return new PromiseCtor((resolve, reject) => {
this.on('error', reject).on(
'complete',
(result, metadata, rawResponse) => {
resolve([result, metadata, rawResponse]);
}
);
});
}
}
export function operation(
op: Operation,
longrunningDescriptor: LongRunningDescriptor,
backoffSettings: BackoffSettings,
callOptions?: CallOptions
) {
return new Operation(op, longrunningDescriptor, backoffSettings, callOptions);
}