How to write tests for long-running operations
The Google Cloud client libraries for Rust have helpers that simplify interaction with long-running operations (henceforth, LROs).
Simulating the behavior of LROs in tests involves understanding the details these helpers hide. This guide shows how to do that.
Prerequisites
This guide assumes you are familiar with the previous chapters:
Tests for automatic polling
Assume the application code awaits lro::Poller::until_done(). In previous
sections, this was called “automatic polling”.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::Operation;
use longrunning::model::operation::Result as OperationResult;
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse};
// Example application code that is under test
mod my_application {
use super::*;
// An example application function that automatically polls.
//
// It starts an LRO, awaits the result, and processes it.
pub async fn my_automatic_poller(
client: &speech::client::Speech,
project_id: &str,
) -> Result<Option<wkt::Duration>> {
use google_cloud_lro::Poller;
client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller()
.until_done()
.await
.map(|r| r.total_billed_duration)
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(response: &BatchRecognizeResponse) -> Result<Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn automatic_polling() -> Result<()> {
// Create a mock, and set expectations on it.
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.return_once(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which automatically polls.
let billed_duration = my_automatic_poller(&client, "my-project").await?;
// Verify the final result of the LRO.
assert_eq!(billed_duration, expected_duration());
Ok(())
}
}
Note that the application only cares about the final result of the LRO. You do not need to test how it handles intermediate results from polling the LRO. The tests can simply return the final result of the LRO from the mock.
Creating the longrunning::model::Operation
Assume you want the call to result in the following response.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::Operation;
use longrunning::model::operation::Result as OperationResult;
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse};
// Example application code that is under test
mod my_application {
use super::*;
// An example application function that automatically polls.
//
// It starts an LRO, awaits the result, and processes it.
pub async fn my_automatic_poller(
client: &speech::client::Speech,
project_id: &str,
) -> Result<Option<wkt::Duration>> {
use google_cloud_lro::Poller;
client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller()
.until_done()
.await
.map(|r| r.total_billed_duration)
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(response: &BatchRecognizeResponse) -> Result<Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn automatic_polling() -> Result<()> {
// Create a mock, and set expectations on it.
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.return_once(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which automatically polls.
let billed_duration = my_automatic_poller(&client, "my-project").await?;
// Verify the final result of the LRO.
assert_eq!(billed_duration, expected_duration());
Ok(())
}
}
You may have noticed that the stub returns a longrunning::model::Operation,
not a BatchRecognizeResponse. You need to pack the desired response into the
Operation::result.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::Operation;
use longrunning::model::operation::Result as OperationResult;
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse};
// Example application code that is under test
mod my_application {
use super::*;
// An example application function that automatically polls.
//
// It starts an LRO, awaits the result, and processes it.
pub async fn my_automatic_poller(
client: &speech::client::Speech,
project_id: &str,
) -> Result<Option<wkt::Duration>> {
use google_cloud_lro::Poller;
client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller()
.until_done()
.await
.map(|r| r.total_billed_duration)
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(response: &BatchRecognizeResponse) -> Result<Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn automatic_polling() -> Result<()> {
// Create a mock, and set expectations on it.
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.return_once(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which automatically polls.
let billed_duration = my_automatic_poller(&client, "my-project").await?;
// Verify the final result of the LRO.
assert_eq!(billed_duration, expected_duration());
Ok(())
}
}
Note also that the done field is set to true. This indicates to the Poller
that the operation has completed, thus ending the polling loop.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::Operation;
use longrunning::model::operation::Result as OperationResult;
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse};
// Example application code that is under test
mod my_application {
use super::*;
// An example application function that automatically polls.
//
// It starts an LRO, awaits the result, and processes it.
pub async fn my_automatic_poller(
client: &speech::client::Speech,
project_id: &str,
) -> Result<Option<wkt::Duration>> {
use google_cloud_lro::Poller;
client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller()
.until_done()
.await
.map(|r| r.total_billed_duration)
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(response: &BatchRecognizeResponse) -> Result<Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn automatic_polling() -> Result<()> {
// Create a mock, and set expectations on it.
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.return_once(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which automatically polls.
let billed_duration = my_automatic_poller(&client, "my-project").await?;
// Verify the final result of the LRO.
assert_eq!(billed_duration, expected_duration());
Ok(())
}
}
Test code
Now you are ready to write the test.
First, define the mock class, which implements the
speech::stub::Speech trait.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::Operation;
use longrunning::model::operation::Result as OperationResult;
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse};
// Example application code that is under test
mod my_application {
use super::*;
// An example application function that automatically polls.
//
// It starts an LRO, awaits the result, and processes it.
pub async fn my_automatic_poller(
client: &speech::client::Speech,
project_id: &str,
) -> Result<Option<wkt::Duration>> {
use google_cloud_lro::Poller;
client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller()
.until_done()
.await
.map(|r| r.total_billed_duration)
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(response: &BatchRecognizeResponse) -> Result<Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn automatic_polling() -> Result<()> {
// Create a mock, and set expectations on it.
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.return_once(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which automatically polls.
let billed_duration = my_automatic_poller(&client, "my-project").await?;
// Verify the final result of the LRO.
assert_eq!(billed_duration, expected_duration());
Ok(())
}
}
Now in the test, create the mock, and set expectations on it.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::Operation;
use longrunning::model::operation::Result as OperationResult;
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse};
// Example application code that is under test
mod my_application {
use super::*;
// An example application function that automatically polls.
//
// It starts an LRO, awaits the result, and processes it.
pub async fn my_automatic_poller(
client: &speech::client::Speech,
project_id: &str,
) -> Result<Option<wkt::Duration>> {
use google_cloud_lro::Poller;
client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller()
.until_done()
.await
.map(|r| r.total_billed_duration)
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(response: &BatchRecognizeResponse) -> Result<Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn automatic_polling() -> Result<()> {
// Create a mock, and set expectations on it.
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.return_once(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which automatically polls.
let billed_duration = my_automatic_poller(&client, "my-project").await?;
// Verify the final result of the LRO.
assert_eq!(billed_duration, expected_duration());
Ok(())
}
}
Finally, create a client from the mock, call the function, and verify the response.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::Operation;
use longrunning::model::operation::Result as OperationResult;
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse};
// Example application code that is under test
mod my_application {
use super::*;
// An example application function that automatically polls.
//
// It starts an LRO, awaits the result, and processes it.
pub async fn my_automatic_poller(
client: &speech::client::Speech,
project_id: &str,
) -> Result<Option<wkt::Duration>> {
use google_cloud_lro::Poller;
client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller()
.until_done()
.await
.map(|r| r.total_billed_duration)
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(response: &BatchRecognizeResponse) -> Result<Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn automatic_polling() -> Result<()> {
// Create a mock, and set expectations on it.
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.return_once(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which automatically polls.
let billed_duration = my_automatic_poller(&client, "my-project").await?;
// Verify the final result of the LRO.
assert_eq!(billed_duration, expected_duration());
Ok(())
}
}
Tests for manual polling with intermediate metadata
Assume the application code manually polls, and does some processing on partial updates.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(
response: &BatchRecognizeResponse,
) -> Result<gax::response::Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
#[tokio::test]
async fn manual_polling_with_metadata() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert_eq!(result.billed_duration?, expected_duration());
Ok(())
}
}
Simulate how the application acts when it receives intermediate metadata by returning in-progress operations from the mock.
Creating the longrunning::model::Operation
The BatchRecognize RPC returns partial results in the form of a
speech::model::OperationMetadata. Like before, you need to pack this into the
returned longrunning::model::Operation, but this time into the
Operation::metadata field.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(
response: &BatchRecognizeResponse,
) -> Result<gax::response::Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
#[tokio::test]
async fn manual_polling_with_metadata() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert_eq!(result.billed_duration?, expected_duration());
Ok(())
}
}
Test code
First, define the mock class, which implements the
speech::stub::Speech trait. Note that get_operation() is
overridden. The reason for this will be clear shortly.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(
response: &BatchRecognizeResponse,
) -> Result<gax::response::Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
#[tokio::test]
async fn manual_polling_with_metadata() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert_eq!(result.billed_duration?, expected_duration());
Ok(())
}
}
Now in the test, create the mock, and set expectations on it.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(
response: &BatchRecognizeResponse,
) -> Result<gax::response::Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
#[tokio::test]
async fn manual_polling_with_metadata() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert_eq!(result.billed_duration?, expected_duration());
Ok(())
}
}
These expectations will return partial results (25%, 50%, 75%), then return the desired final outcome.
Now a few things you probably noticed.
-
The first expectation is set on
batch_recognize(), whereas all subsequent expectations are set onget_operation().The initial
BatchRecognizeRPC starts the LRO on the server-side. The server returns some identifier for the LRO. This is thenamefield which is omitted from the test code, for simplicity.From then on, the client library just polls the status of that LRO. It does this using the
GetOperationRPC.That is why you set expectations on different RPCs for the initial response vs. all subsequent responses.
-
Expectations are set in a sequence.
This allows
mockallto verify the order of the calls. It is also necessary to determine whichexpect_get_operationis matched.
Finally, create a client from the mock, call the function, and verify the response.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(
response: &BatchRecognizeResponse,
) -> Result<gax::response::Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
#[tokio::test]
async fn manual_polling_with_metadata() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert_eq!(result.billed_duration?, expected_duration());
Ok(())
}
}
Simulating errors
Errors can arise in an LRO from a few places.
If your application uses automatic polling, the following cases are all
equivalent: until_done() returns the error in the Result, regardless of
where it originated.
Simulating an error starting an LRO will
yield the simplest test.
Note that the stubbed out client does not have a retry or polling policy. In all cases, the polling loop will terminate on the first error, even if the error is typically considered transient.
Simulating an error starting an LRO
The simplest way to simulate an error is to have the initial request fail with an error.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_rpc as rpc;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
fn make_failed_operation(status: rpc::model::Status) -> Result<Response<Operation>> {
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Error(status.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn error_starting_lro() -> Result<()> {
let mut mock = MockSpeech::new();
mock.expect_batch_recognize().return_once(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Resource exhausted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the the final result.
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
#[tokio::test]
async fn lro_ending_in_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
// This is a common error for `Create*` RPCs, which are often
// LROs. It is less applicable to `BatchRecognize` in practice.
let status = rpc::model::Status::default()
.set_code(gax::error::rpc::Code::AlreadyExists as i32)
.set_message("resource already exists");
make_failed_operation(status)
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
#[tokio::test]
async fn polling_loop_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Operation was aborted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25]);
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
}
For manual polling, the completed branch returns an error starting an LRO. This ends the polling loop.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_rpc as rpc;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
fn make_failed_operation(status: rpc::model::Status) -> Result<Response<Operation>> {
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Error(status.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn error_starting_lro() -> Result<()> {
let mut mock = MockSpeech::new();
mock.expect_batch_recognize().return_once(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Resource exhausted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the the final result.
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
#[tokio::test]
async fn lro_ending_in_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
// This is a common error for `Create*` RPCs, which are often
// LROs. It is less applicable to `BatchRecognize` in practice.
let status = rpc::model::Status::default()
.set_code(gax::error::rpc::Code::AlreadyExists as i32)
.set_message("resource already exists");
make_failed_operation(status)
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
#[tokio::test]
async fn polling_loop_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Operation was aborted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25]);
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
}
Simulating an LRO resulting in an error
If you need to simulate an LRO resulting in an error, after intermediate
metadata is returned, you need to return the error in the final
longrunning::model::Operation.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_rpc as rpc;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
fn make_failed_operation(status: rpc::model::Status) -> Result<Response<Operation>> {
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Error(status.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn error_starting_lro() -> Result<()> {
let mut mock = MockSpeech::new();
mock.expect_batch_recognize().return_once(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Resource exhausted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the the final result.
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
#[tokio::test]
async fn lro_ending_in_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
// This is a common error for `Create*` RPCs, which are often
// LROs. It is less applicable to `BatchRecognize` in practice.
let status = rpc::model::Status::default()
.set_code(gax::error::rpc::Code::AlreadyExists as i32)
.set_message("resource already exists");
make_failed_operation(status)
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
#[tokio::test]
async fn polling_loop_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Operation was aborted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25]);
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
}
Set expectations to return the Operation from get_operation as before.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_rpc as rpc;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
fn make_failed_operation(status: rpc::model::Status) -> Result<Response<Operation>> {
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Error(status.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn error_starting_lro() -> Result<()> {
let mut mock = MockSpeech::new();
mock.expect_batch_recognize().return_once(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Resource exhausted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the the final result.
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
#[tokio::test]
async fn lro_ending_in_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
// This is a common error for `Create*` RPCs, which are often
// LROs. It is less applicable to `BatchRecognize` in practice.
let status = rpc::model::Status::default()
.set_code(gax::error::rpc::Code::AlreadyExists as i32)
.set_message("resource already exists");
make_failed_operation(status)
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
#[tokio::test]
async fn polling_loop_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Operation was aborted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25]);
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
}
To simulate a LRO that completes with an error outcome set the
PollingResult::Completed branch to contain the error. This ends the polling
loop.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_rpc as rpc;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
fn make_failed_operation(status: rpc::model::Status) -> Result<Response<Operation>> {
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Error(status.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn error_starting_lro() -> Result<()> {
let mut mock = MockSpeech::new();
mock.expect_batch_recognize().return_once(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Resource exhausted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the the final result.
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
#[tokio::test]
async fn lro_ending_in_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
// This is a common error for `Create*` RPCs, which are often
// LROs. It is less applicable to `BatchRecognize` in practice.
let status = rpc::model::Status::default()
.set_code(gax::error::rpc::Code::AlreadyExists as i32)
.set_message("resource already exists");
make_failed_operation(status)
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
#[tokio::test]
async fn polling_loop_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Operation was aborted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25]);
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
}
Simulating a polling error
Polling loops can also exit because the polling policy has been exhausted. When this happens, the client library can not say definitively whether the LRO has completed or not.
If your application has custom logic to deal with this case, you can exercise
this logic by returning an error from the get_operation expectation.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_rpc as rpc;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
fn make_failed_operation(status: rpc::model::Status) -> Result<Response<Operation>> {
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Error(status.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn error_starting_lro() -> Result<()> {
let mut mock = MockSpeech::new();
mock.expect_batch_recognize().return_once(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Resource exhausted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the the final result.
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
#[tokio::test]
async fn lro_ending_in_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
// This is a common error for `Create*` RPCs, which are often
// LROs. It is less applicable to `BatchRecognize` in practice.
let status = rpc::model::Status::default()
.set_code(gax::error::rpc::Code::AlreadyExists as i32)
.set_message("resource already exists");
make_failed_operation(status)
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
#[tokio::test]
async fn polling_loop_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Operation was aborted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25]);
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
}
To simulate a LRO polling error, set the PollingResult::PollingError branch
with the error you want to simulate.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_rpc as rpc;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
fn make_failed_operation(status: rpc::model::Status) -> Result<Response<Operation>> {
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Error(status.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn error_starting_lro() -> Result<()> {
let mut mock = MockSpeech::new();
mock.expect_batch_recognize().return_once(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Resource exhausted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the the final result.
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
#[tokio::test]
async fn lro_ending_in_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
// This is a common error for `Create*` RPCs, which are often
// LROs. It is less applicable to `BatchRecognize` in practice.
let status = rpc::model::Status::default()
.set_code(gax::error::rpc::Code::AlreadyExists as i32)
.set_message("resource already exists");
make_failed_operation(status)
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
#[tokio::test]
async fn polling_loop_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Operation was aborted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25]);
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
}
Automatic polling - Full test
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::Operation;
use longrunning::model::operation::Result as OperationResult;
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse};
// Example application code that is under test
mod my_application {
use super::*;
// An example application function that automatically polls.
//
// It starts an LRO, awaits the result, and processes it.
pub async fn my_automatic_poller(
client: &speech::client::Speech,
project_id: &str,
) -> Result<Option<wkt::Duration>> {
use google_cloud_lro::Poller;
client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller()
.until_done()
.await
.map(|r| r.total_billed_duration)
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(response: &BatchRecognizeResponse) -> Result<Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn automatic_polling() -> Result<()> {
// Create a mock, and set expectations on it.
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.return_once(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which automatically polls.
let billed_duration = my_automatic_poller(&client, "my-project").await?;
// Verify the final result of the LRO.
assert_eq!(billed_duration, expected_duration());
Ok(())
}
}
Manual polling with intermediate metadata - Full test
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(
response: &BatchRecognizeResponse,
) -> Result<gax::response::Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
#[tokio::test]
async fn manual_polling_with_metadata() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert_eq!(result.billed_duration?, expected_duration());
Ok(())
}
}
Simulating errors - Full tests
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_rpc as rpc;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
fn make_failed_operation(status: rpc::model::Status) -> Result<Response<Operation>> {
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Error(status.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn error_starting_lro() -> Result<()> {
let mut mock = MockSpeech::new();
mock.expect_batch_recognize().return_once(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Resource exhausted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the the final result.
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
#[tokio::test]
async fn lro_ending_in_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
// This is a common error for `Create*` RPCs, which are often
// LROs. It is less applicable to `BatchRecognize` in practice.
let status = rpc::model::Status::default()
.set_code(gax::error::rpc::Code::AlreadyExists as i32)
.set_message("resource already exists");
make_failed_operation(status)
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
#[tokio::test]
async fn polling_loop_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Operation was aborted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25]);
assert!(
result.billed_duration.is_err(),
"{:?}",
result.billed_duration
);
Ok(())
}
}