How to write tests for long-running operations
The Google Cloud client libraries for Rust have helpers that simplify interaction with long-running operations (henceforth, LROs).
Simulating the behavior of LROs in tests involves understanding the details these helpers hide. This guide shows how to do that.
Prerequisites
This guide assumes you are familiar with the previous chapters:
Tests for automatic polling
Let's say our application code awaits lro::Poller::until_done()
. In previous
sections, we called this "automatic polling".
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::Operation;
use longrunning::model::operation::Result as OperationResult;
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse};
// Example application code that is under test
mod my_application {
use super::*;
// An example application function that automatically polls.
//
// It starts an LRO, awaits the result, and processes it.
pub async fn my_automatic_poller(
client: &speech::client::Speech,
project_id: &str,
) -> Result<Option<wkt::Duration>> {
use google_cloud_lro::Poller;
client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller()
.until_done()
.await
.map(|r| r.total_billed_duration)
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(response: &BatchRecognizeResponse) -> Result<Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn automatic_polling() -> Result<()> {
// Create a mock, and set expectations on it.
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.return_once(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which automatically polls.
let billed_duration = my_automatic_poller(&client, "my-project").await?;
// Verify the final result of the LRO.
assert_eq!(billed_duration, expected_duration());
Ok(())
}
}
Note that our application only cares about the final result of the LRO. We do not need to test how it handles intermediate results from polling the LRO. Our tests can simply return the final result of the LRO from the mock.
Creating the longrunning::model::Operation
Let's say we want our call to result in the following response.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::Operation;
use longrunning::model::operation::Result as OperationResult;
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse};
// Example application code that is under test
mod my_application {
use super::*;
// An example application function that automatically polls.
//
// It starts an LRO, awaits the result, and processes it.
pub async fn my_automatic_poller(
client: &speech::client::Speech,
project_id: &str,
) -> Result<Option<wkt::Duration>> {
use google_cloud_lro::Poller;
client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller()
.until_done()
.await
.map(|r| r.total_billed_duration)
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(response: &BatchRecognizeResponse) -> Result<Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn automatic_polling() -> Result<()> {
// Create a mock, and set expectations on it.
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.return_once(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which automatically polls.
let billed_duration = my_automatic_poller(&client, "my-project").await?;
// Verify the final result of the LRO.
assert_eq!(billed_duration, expected_duration());
Ok(())
}
}
You may have noticed that the stub returns a longrunning::model::Operation
,
not a BatchRecognizeResponse
. We need to pack our desired response into the
Operation::result
.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::Operation;
use longrunning::model::operation::Result as OperationResult;
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse};
// Example application code that is under test
mod my_application {
use super::*;
// An example application function that automatically polls.
//
// It starts an LRO, awaits the result, and processes it.
pub async fn my_automatic_poller(
client: &speech::client::Speech,
project_id: &str,
) -> Result<Option<wkt::Duration>> {
use google_cloud_lro::Poller;
client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller()
.until_done()
.await
.map(|r| r.total_billed_duration)
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(response: &BatchRecognizeResponse) -> Result<Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn automatic_polling() -> Result<()> {
// Create a mock, and set expectations on it.
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.return_once(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which automatically polls.
let billed_duration = my_automatic_poller(&client, "my-project").await?;
// Verify the final result of the LRO.
assert_eq!(billed_duration, expected_duration());
Ok(())
}
}
Note also that we set the done
field to true
. This indicates to the Poller
that the operation has completed, thus ending the polling loop.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::Operation;
use longrunning::model::operation::Result as OperationResult;
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse};
// Example application code that is under test
mod my_application {
use super::*;
// An example application function that automatically polls.
//
// It starts an LRO, awaits the result, and processes it.
pub async fn my_automatic_poller(
client: &speech::client::Speech,
project_id: &str,
) -> Result<Option<wkt::Duration>> {
use google_cloud_lro::Poller;
client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller()
.until_done()
.await
.map(|r| r.total_billed_duration)
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(response: &BatchRecognizeResponse) -> Result<Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn automatic_polling() -> Result<()> {
// Create a mock, and set expectations on it.
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.return_once(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which automatically polls.
let billed_duration = my_automatic_poller(&client, "my-project").await?;
// Verify the final result of the LRO.
assert_eq!(billed_duration, expected_duration());
Ok(())
}
}
Test code
Now we are ready to write our test.
First we define our mock class, which implements the
speech::stub::Speech
trait.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::Operation;
use longrunning::model::operation::Result as OperationResult;
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse};
// Example application code that is under test
mod my_application {
use super::*;
// An example application function that automatically polls.
//
// It starts an LRO, awaits the result, and processes it.
pub async fn my_automatic_poller(
client: &speech::client::Speech,
project_id: &str,
) -> Result<Option<wkt::Duration>> {
use google_cloud_lro::Poller;
client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller()
.until_done()
.await
.map(|r| r.total_billed_duration)
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(response: &BatchRecognizeResponse) -> Result<Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn automatic_polling() -> Result<()> {
// Create a mock, and set expectations on it.
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.return_once(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which automatically polls.
let billed_duration = my_automatic_poller(&client, "my-project").await?;
// Verify the final result of the LRO.
assert_eq!(billed_duration, expected_duration());
Ok(())
}
}
Now in our test we create our mock, and set expectations on it.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::Operation;
use longrunning::model::operation::Result as OperationResult;
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse};
// Example application code that is under test
mod my_application {
use super::*;
// An example application function that automatically polls.
//
// It starts an LRO, awaits the result, and processes it.
pub async fn my_automatic_poller(
client: &speech::client::Speech,
project_id: &str,
) -> Result<Option<wkt::Duration>> {
use google_cloud_lro::Poller;
client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller()
.until_done()
.await
.map(|r| r.total_billed_duration)
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(response: &BatchRecognizeResponse) -> Result<Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn automatic_polling() -> Result<()> {
// Create a mock, and set expectations on it.
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.return_once(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which automatically polls.
let billed_duration = my_automatic_poller(&client, "my-project").await?;
// Verify the final result of the LRO.
assert_eq!(billed_duration, expected_duration());
Ok(())
}
}
Finally, we create a client from the mock, call our function, and verify the response.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::Operation;
use longrunning::model::operation::Result as OperationResult;
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse};
// Example application code that is under test
mod my_application {
use super::*;
// An example application function that automatically polls.
//
// It starts an LRO, awaits the result, and processes it.
pub async fn my_automatic_poller(
client: &speech::client::Speech,
project_id: &str,
) -> Result<Option<wkt::Duration>> {
use google_cloud_lro::Poller;
client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller()
.until_done()
.await
.map(|r| r.total_billed_duration)
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(response: &BatchRecognizeResponse) -> Result<Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn automatic_polling() -> Result<()> {
// Create a mock, and set expectations on it.
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.return_once(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which automatically polls.
let billed_duration = my_automatic_poller(&client, "my-project").await?;
// Verify the final result of the LRO.
assert_eq!(billed_duration, expected_duration());
Ok(())
}
}
Tests for manual polling with intermediate metadata
Let's say our application code manually polls, and does some processing on partial updates.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(
response: &BatchRecognizeResponse,
) -> Result<gax::response::Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
#[tokio::test]
async fn manual_polling_with_metadata() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert_eq!(result.billed_duration?, expected_duration());
Ok(())
}
}
We want to simulate how our application acts when it receives intermediate metadata. We can achieve this by returning in-progress operations from our mock.
Creating the longrunning::model::Operation
The BatchRecognize
RPC returns partial results in the form of a
speech::model::OperationMetadata
. Like before, we will need to pack this into
the returned longrunning::model::Operation
, but this time into the
Operation::metadata
field.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(
response: &BatchRecognizeResponse,
) -> Result<gax::response::Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
#[tokio::test]
async fn manual_polling_with_metadata() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert_eq!(result.billed_duration?, expected_duration());
Ok(())
}
}
Test code
First we define our mock class, which implements the
speech::stub::Speech
trait. Note that we override
get_operation()
. We will see why shortly.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(
response: &BatchRecognizeResponse,
) -> Result<gax::response::Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
#[tokio::test]
async fn manual_polling_with_metadata() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert_eq!(result.billed_duration?, expected_duration());
Ok(())
}
}
Now in our test we create our mock, and set expectations on it.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(
response: &BatchRecognizeResponse,
) -> Result<gax::response::Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
#[tokio::test]
async fn manual_polling_with_metadata() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert_eq!(result.billed_duration?, expected_duration());
Ok(())
}
}
These expectations will return partial results (25%, 50%, 75%), then return our desired final outcome.
Now a few things you probably noticed.
-
The first expectation is set on
batch_recognize()
, whereas all subsequent expectations are set onget_operation()
.The initial
BatchRecognize
RPC starts the LRO on the server-side. The server returns some identifier for the LRO. This is thename
field which is omitted from the test code, for simplicity.From then on, the client library just polls the status of that LRO. It does this using the
GetOperation
RPC.That is why we set expectations on different RPCs for the initial response vs. all subsequent responses.
-
Expectations are set in a sequence.
This allows
mockall
to verify the order of the calls. It is also necessary to determine whichexpect_get_operation
is matched.
Finally, we create a client from the mock, call our function, and verify the response.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(
response: &BatchRecognizeResponse,
) -> Result<gax::response::Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
#[tokio::test]
async fn manual_polling_with_metadata() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert_eq!(result.billed_duration?, expected_duration());
Ok(())
}
}
Simulating errors
Errors can arise in an LRO from a few places.
If your application uses automatic polling, the following cases are all
equivalent: until_done()
returns the error in the Result
, regardless of
where it originated.
Simulating an error starting an LRO will
yield the simplest test.
Note that the stubbed out client does not have a retry or polling policy. In all cases, the polling loop will terminate on the first error, even if the error is typically considered transient.
Simulating an error starting an LRO
The simplest way to simulate an error is to have the initial request fail with an error.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_rpc as rpc;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
fn make_failed_operation(status: rpc::model::Status) -> Result<Response<Operation>> {
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Error(status.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn error_starting_lro() -> Result<()> {
let mut mock = MockSpeech::new();
mock.expect_batch_recognize().return_once(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Resource exhausted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the the final result.
assert!(result.billed_duration.is_err());
Ok(())
}
#[tokio::test]
async fn lro_ending_in_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
// This is a common error for `Create*` RPCs, which are often
// LROs. It is less applicable to `BatchRecognize` in practice.
let status = rpc::model::Status::default()
.set_code(gax::error::rpc::Code::AlreadyExists as i32)
.set_message("resource already exists");
make_failed_operation(status)
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert!(result.billed_duration.is_err());
Ok(())
}
#[tokio::test]
async fn polling_loop_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Operation was aborted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25]);
assert!(result.billed_duration.is_err());
Ok(())
}
}
For manual polling, an error starting an LRO is returned via the completed branch. This ends the polling loop.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_rpc as rpc;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
fn make_failed_operation(status: rpc::model::Status) -> Result<Response<Operation>> {
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Error(status.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn error_starting_lro() -> Result<()> {
let mut mock = MockSpeech::new();
mock.expect_batch_recognize().return_once(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Resource exhausted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the the final result.
assert!(result.billed_duration.is_err());
Ok(())
}
#[tokio::test]
async fn lro_ending_in_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
// This is a common error for `Create*` RPCs, which are often
// LROs. It is less applicable to `BatchRecognize` in practice.
let status = rpc::model::Status::default()
.set_code(gax::error::rpc::Code::AlreadyExists as i32)
.set_message("resource already exists");
make_failed_operation(status)
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert!(result.billed_duration.is_err());
Ok(())
}
#[tokio::test]
async fn polling_loop_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Operation was aborted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25]);
assert!(result.billed_duration.is_err());
Ok(())
}
}
Simulating an LRO resulting in an error
If you need to simulate an LRO resulting in an error, after intermediate
metadata is returned, we need to return the error in the final
longrunning::model::Operation
.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_rpc as rpc;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
fn make_failed_operation(status: rpc::model::Status) -> Result<Response<Operation>> {
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Error(status.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn error_starting_lro() -> Result<()> {
let mut mock = MockSpeech::new();
mock.expect_batch_recognize().return_once(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Resource exhausted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the the final result.
assert!(result.billed_duration.is_err());
Ok(())
}
#[tokio::test]
async fn lro_ending_in_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
// This is a common error for `Create*` RPCs, which are often
// LROs. It is less applicable to `BatchRecognize` in practice.
let status = rpc::model::Status::default()
.set_code(gax::error::rpc::Code::AlreadyExists as i32)
.set_message("resource already exists");
make_failed_operation(status)
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert!(result.billed_duration.is_err());
Ok(())
}
#[tokio::test]
async fn polling_loop_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Operation was aborted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25]);
assert!(result.billed_duration.is_err());
Ok(())
}
}
We set our expectations to return the Operation
from get_operation
as
before.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_rpc as rpc;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
fn make_failed_operation(status: rpc::model::Status) -> Result<Response<Operation>> {
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Error(status.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn error_starting_lro() -> Result<()> {
let mut mock = MockSpeech::new();
mock.expect_batch_recognize().return_once(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Resource exhausted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the the final result.
assert!(result.billed_duration.is_err());
Ok(())
}
#[tokio::test]
async fn lro_ending_in_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
// This is a common error for `Create*` RPCs, which are often
// LROs. It is less applicable to `BatchRecognize` in practice.
let status = rpc::model::Status::default()
.set_code(gax::error::rpc::Code::AlreadyExists as i32)
.set_message("resource already exists");
make_failed_operation(status)
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert!(result.billed_duration.is_err());
Ok(())
}
#[tokio::test]
async fn polling_loop_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Operation was aborted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25]);
assert!(result.billed_duration.is_err());
Ok(())
}
}
An LRO ending in an error will be returned via the completed branch. This ends the polling loop.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_rpc as rpc;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
fn make_failed_operation(status: rpc::model::Status) -> Result<Response<Operation>> {
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Error(status.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn error_starting_lro() -> Result<()> {
let mut mock = MockSpeech::new();
mock.expect_batch_recognize().return_once(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Resource exhausted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the the final result.
assert!(result.billed_duration.is_err());
Ok(())
}
#[tokio::test]
async fn lro_ending_in_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
// This is a common error for `Create*` RPCs, which are often
// LROs. It is less applicable to `BatchRecognize` in practice.
let status = rpc::model::Status::default()
.set_code(gax::error::rpc::Code::AlreadyExists as i32)
.set_message("resource already exists");
make_failed_operation(status)
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert!(result.billed_duration.is_err());
Ok(())
}
#[tokio::test]
async fn polling_loop_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Operation was aborted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25]);
assert!(result.billed_duration.is_err());
Ok(())
}
}
Simulating a polling error
Polling loops can also exit because the polling policy has been exhausted. When this happens, the client library can not say definitively whether the LRO has completed or not.
If your application has custom logic to deal with this case, we can exercise it
by returning an error from the get_operation
expectation.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_rpc as rpc;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
fn make_failed_operation(status: rpc::model::Status) -> Result<Response<Operation>> {
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Error(status.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn error_starting_lro() -> Result<()> {
let mut mock = MockSpeech::new();
mock.expect_batch_recognize().return_once(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Resource exhausted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the the final result.
assert!(result.billed_duration.is_err());
Ok(())
}
#[tokio::test]
async fn lro_ending_in_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
// This is a common error for `Create*` RPCs, which are often
// LROs. It is less applicable to `BatchRecognize` in practice.
let status = rpc::model::Status::default()
.set_code(gax::error::rpc::Code::AlreadyExists as i32)
.set_message("resource already exists");
make_failed_operation(status)
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert!(result.billed_duration.is_err());
Ok(())
}
#[tokio::test]
async fn polling_loop_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Operation was aborted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25]);
assert!(result.billed_duration.is_err());
Ok(())
}
}
An LRO ending with a polling error will be returned via the polling error branch.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_rpc as rpc;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
fn make_failed_operation(status: rpc::model::Status) -> Result<Response<Operation>> {
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Error(status.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn error_starting_lro() -> Result<()> {
let mut mock = MockSpeech::new();
mock.expect_batch_recognize().return_once(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Resource exhausted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the the final result.
assert!(result.billed_duration.is_err());
Ok(())
}
#[tokio::test]
async fn lro_ending_in_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
// This is a common error for `Create*` RPCs, which are often
// LROs. It is less applicable to `BatchRecognize` in practice.
let status = rpc::model::Status::default()
.set_code(gax::error::rpc::Code::AlreadyExists as i32)
.set_message("resource already exists");
make_failed_operation(status)
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert!(result.billed_duration.is_err());
Ok(())
}
#[tokio::test]
async fn polling_loop_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Operation was aborted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25]);
assert!(result.billed_duration.is_err());
Ok(())
}
}
Automatic polling - Full test
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::Operation;
use longrunning::model::operation::Result as OperationResult;
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse};
// Example application code that is under test
mod my_application {
use super::*;
// An example application function that automatically polls.
//
// It starts an LRO, awaits the result, and processes it.
pub async fn my_automatic_poller(
client: &speech::client::Speech,
project_id: &str,
) -> Result<Option<wkt::Duration>> {
use google_cloud_lro::Poller;
client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller()
.until_done()
.await
.map(|r| r.total_billed_duration)
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(response: &BatchRecognizeResponse) -> Result<Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn automatic_polling() -> Result<()> {
// Create a mock, and set expectations on it.
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.return_once(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which automatically polls.
let billed_duration = my_automatic_poller(&client, "my-project").await?;
// Verify the final result of the LRO.
assert_eq!(billed_duration, expected_duration());
Ok(())
}
}
Manual polling with intermediate metadata - Full test
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, BatchRecognizeResponse, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn expected_duration() -> Option<wkt::Duration> {
Some(wkt::Duration::clamp(100, 0))
}
fn expected_response() -> BatchRecognizeResponse {
BatchRecognizeResponse::new().set_or_clear_total_billed_duration(expected_duration())
}
fn make_finished_operation(
response: &BatchRecognizeResponse,
) -> Result<gax::response::Response<Operation>> {
let any = wkt::Any::from_msg(response).expect("test message should succeed");
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Response(any.into()));
Ok(Response::from(operation))
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
#[tokio::test]
async fn manual_polling_with_metadata() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_finished_operation(&expected_response()));
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert_eq!(result.billed_duration?, expected_duration());
Ok(())
}
}
Simulating errors - Full tests
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Examples showing how to simulate LROs in tests.
use gax::Result;
use gax::response::Response;
use google_cloud_gax as gax;
use google_cloud_longrunning as longrunning;
use google_cloud_rpc as rpc;
use google_cloud_speech_v2 as speech;
use google_cloud_wkt as wkt;
use longrunning::model::operation::Result as OperationResult;
use longrunning::model::{GetOperationRequest, Operation};
use speech::model::{BatchRecognizeRequest, OperationMetadata};
// Example application code that is under test
mod my_application {
use super::*;
pub struct BatchRecognizeResult {
pub progress_updates: Vec<i32>,
pub billed_duration: Result<Option<wkt::Duration>>,
}
// An example application function that manually polls.
//
// It starts an LRO. It consolidates the polling results, whether full or
// partial.
//
// In this case, it is the `BatchRecognize` RPC. If we get a partial update,
// we extract the `progress_percent` field. If we get a final result, we
// extract the `total_billed_duration` field.
pub async fn my_manual_poller(
client: &speech::client::Speech,
project_id: &str,
) -> BatchRecognizeResult {
use google_cloud_lro::{Poller, PollingResult};
let mut progress_updates = Vec::new();
let mut poller = client
.batch_recognize()
.set_recognizer(format!(
"projects/{project_id}/locations/global/recognizers/_"
))
.poller();
while let Some(p) = poller.poll().await {
match p {
PollingResult::Completed(r) => {
let billed_duration = r.map(|r| r.total_billed_duration);
return BatchRecognizeResult {
progress_updates,
billed_duration,
};
}
PollingResult::InProgress(m) => {
if let Some(metadata) = m {
// This is a silly application. Your application likely
// performs some task immediately with the partial
// update, instead of storing it for after the operation
// has completed.
progress_updates.push(metadata.progress_percent);
}
}
PollingResult::PollingError(e) => {
return BatchRecognizeResult {
progress_updates,
billed_duration: Err(e),
};
}
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// We can only get here if `poll()` returns `None`, but it only returns
// `None` after it returned `PollingResult::Completed`. Therefore this
// is never reached.
unreachable!("loop should exit via the `Completed` branch.");
}
}
#[cfg(test)]
mod tests {
use super::my_application::*;
use super::*;
mockall::mock! {
#[derive(Debug)]
Speech {}
impl speech::stub::Speech for Speech {
async fn batch_recognize(&self, req: BatchRecognizeRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
async fn get_operation(&self, req: GetOperationRequest, _options: gax::options::RequestOptions) -> Result<Response<Operation>>;
}
}
fn make_partial_operation(progress: i32) -> Result<Response<Operation>> {
let metadata = OperationMetadata::new().set_progress_percent(progress);
let any = wkt::Any::from_msg(&metadata).expect("test message should succeed");
let operation = Operation::new().set_metadata(any);
Ok(Response::from(operation))
}
fn make_failed_operation(status: rpc::model::Status) -> Result<Response<Operation>> {
let operation = Operation::new()
.set_done(true)
.set_result(OperationResult::Error(status.into()));
Ok(Response::from(operation))
}
#[tokio::test]
async fn error_starting_lro() -> Result<()> {
let mut mock = MockSpeech::new();
mock.expect_batch_recognize().return_once(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Resource exhausted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the the final result.
assert!(result.billed_duration.is_err());
Ok(())
}
#[tokio::test]
async fn lro_ending_in_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(50));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(75));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
// This is a common error for `Create*` RPCs, which are often
// LROs. It is less applicable to `BatchRecognize` in practice.
let status = rpc::model::Status::default()
.set_code(gax::error::rpc::Code::AlreadyExists as i32)
.set_message("resource already exists");
make_failed_operation(status)
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25, 50, 75]);
assert!(result.billed_duration.is_err());
Ok(())
}
#[tokio::test]
async fn polling_loop_error() -> Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock = MockSpeech::new();
mock.expect_batch_recognize()
.once()
.in_sequence(&mut seq)
.returning(|_, _| make_partial_operation(25));
mock.expect_get_operation()
.once()
.in_sequence(&mut seq)
.returning(|_, _| {
use gax::error::Error;
use gax::error::rpc::{Code, Status};
let status = Status::default()
.set_code(Code::Aborted)
.set_message("Operation was aborted");
Err(Error::service(status))
});
// Create a client, implemented by our mock.
let client = speech::client::Speech::from_stub(mock);
// Call our function which manually polls.
let result = my_manual_poller(&client, "my-project").await;
// Verify the partial metadata updates, and the final result.
assert_eq!(result.progress_updates, [25]);
assert!(result.billed_duration.is_err());
Ok(())
}
}