Use errors to terminate object writes
In this guide you will learn how to use errors and custom data sources to terminate an object write before it is finalized. This is useful when applications want to stop the client library from finalizing the object creation if there is some error condition.
Prerequisites
The guide assumes you have an existing Google Cloud project with billing enabled, and a Cloud Storage bucket in that project.
The tutorial assumes you are familiar with the basics of using the client library. If not, you may want to read the quickstart guide first.
Add the client library as a dependency
cargo add google-cloud-storage
Overview
The client library creates objects from any type implementing the
StreamingSource
trait. The client library pulls data from implementations of
the trait. The library terminates the object write on the first error.
In this guide you will build a custom implementation of StreamingSource
that
returns some data and then stops on an error. You will verify that an object
write using this custom data source returns an error.
Create a custom error type
To terminate an object write without finalizing it, your StreamingSource must return an error. In this example you will create a simple error type, in your application code you can use any existing error type:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[derive(Debug)]
pub enum MyError {
ExpectedProblem,
OhNoes,
}
impl std::error::Error for MyError {}
impl std::fmt::Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ExpectedProblem => write!(f, "this kind of thing happens"),
Self::OhNoes => write!(f, "oh noes! something terrible happened"),
}
}
}
#[derive(Debug, Default)]
struct MySource(u32);
impl google_cloud_storage::streaming_source::StreamingSource for MySource {
type Error = MyError;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0 += 1;
match self.0 {
42 => Some(Err(MyError::ExpectedProblem)),
n if n > 42 => None,
n => Some(Ok(bytes::Bytes::from_owner(format!(
"test data for the example {n}\n"
)))),
}
}
}
pub async fn attempt_upload(bucket_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let upload = client
.write_object(bucket_name, "expect-error", MySource::default())
.send_buffered()
.await;
println!("Upload result {upload:?}");
let err = upload.expect_err("the source is supposed to terminate the upload");
assert!(err.is_serialization(), "{err:?}");
use std::error::Error as _;
assert!(err.source().is_some_and(|e| e.is::<MyError>()), "{err:?}");
Ok(())
}
The client library requires that your custom error type implements the standard Error trait:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[derive(Debug)]
pub enum MyError {
ExpectedProblem,
OhNoes,
}
impl std::error::Error for MyError {}
impl std::fmt::Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ExpectedProblem => write!(f, "this kind of thing happens"),
Self::OhNoes => write!(f, "oh noes! something terrible happened"),
}
}
}
#[derive(Debug, Default)]
struct MySource(u32);
impl google_cloud_storage::streaming_source::StreamingSource for MySource {
type Error = MyError;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0 += 1;
match self.0 {
42 => Some(Err(MyError::ExpectedProblem)),
n if n > 42 => None,
n => Some(Ok(bytes::Bytes::from_owner(format!(
"test data for the example {n}\n"
)))),
}
}
}
pub async fn attempt_upload(bucket_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let upload = client
.write_object(bucket_name, "expect-error", MySource::default())
.send_buffered()
.await;
println!("Upload result {upload:?}");
let err = upload.expect_err("the source is supposed to terminate the upload");
assert!(err.is_serialization(), "{err:?}");
use std::error::Error as _;
assert!(err.source().is_some_and(|e| e.is::<MyError>()), "{err:?}");
Ok(())
}
As you may recall, that requires implementing Display too:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[derive(Debug)]
pub enum MyError {
ExpectedProblem,
OhNoes,
}
impl std::error::Error for MyError {}
impl std::fmt::Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ExpectedProblem => write!(f, "this kind of thing happens"),
Self::OhNoes => write!(f, "oh noes! something terrible happened"),
}
}
}
#[derive(Debug, Default)]
struct MySource(u32);
impl google_cloud_storage::streaming_source::StreamingSource for MySource {
type Error = MyError;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0 += 1;
match self.0 {
42 => Some(Err(MyError::ExpectedProblem)),
n if n > 42 => None,
n => Some(Ok(bytes::Bytes::from_owner(format!(
"test data for the example {n}\n"
)))),
}
}
}
pub async fn attempt_upload(bucket_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let upload = client
.write_object(bucket_name, "expect-error", MySource::default())
.send_buffered()
.await;
println!("Upload result {upload:?}");
let err = upload.expect_err("the source is supposed to terminate the upload");
assert!(err.is_serialization(), "{err:?}");
use std::error::Error as _;
assert!(err.source().is_some_and(|e| e.is::<MyError>()), "{err:?}");
Ok(())
}
Create a custom StreamingSource
Create a type that generates the data for your object. In this example you will use synthetic data using a counter:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[derive(Debug)]
pub enum MyError {
ExpectedProblem,
OhNoes,
}
impl std::error::Error for MyError {}
impl std::fmt::Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ExpectedProblem => write!(f, "this kind of thing happens"),
Self::OhNoes => write!(f, "oh noes! something terrible happened"),
}
}
}
#[derive(Debug, Default)]
struct MySource(u32);
impl google_cloud_storage::streaming_source::StreamingSource for MySource {
type Error = MyError;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0 += 1;
match self.0 {
42 => Some(Err(MyError::ExpectedProblem)),
n if n > 42 => None,
n => Some(Ok(bytes::Bytes::from_owner(format!(
"test data for the example {n}\n"
)))),
}
}
}
pub async fn attempt_upload(bucket_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let upload = client
.write_object(bucket_name, "expect-error", MySource::default())
.send_buffered()
.await;
println!("Upload result {upload:?}");
let err = upload.expect_err("the source is supposed to terminate the upload");
assert!(err.is_serialization(), "{err:?}");
use std::error::Error as _;
assert!(err.source().is_some_and(|e| e.is::<MyError>()), "{err:?}");
Ok(())
}
Implement the streaming source trait for your type:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[derive(Debug)]
pub enum MyError {
ExpectedProblem,
OhNoes,
}
impl std::error::Error for MyError {}
impl std::fmt::Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ExpectedProblem => write!(f, "this kind of thing happens"),
Self::OhNoes => write!(f, "oh noes! something terrible happened"),
}
}
}
#[derive(Debug, Default)]
struct MySource(u32);
impl google_cloud_storage::streaming_source::StreamingSource for MySource {
type Error = MyError;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0 += 1;
match self.0 {
42 => Some(Err(MyError::ExpectedProblem)),
n if n > 42 => None,
n => Some(Ok(bytes::Bytes::from_owner(format!(
"test data for the example {n}\n"
)))),
}
}
}
pub async fn attempt_upload(bucket_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let upload = client
.write_object(bucket_name, "expect-error", MySource::default())
.send_buffered()
.await;
println!("Upload result {upload:?}");
let err = upload.expect_err("the source is supposed to terminate the upload");
assert!(err.is_serialization(), "{err:?}");
use std::error::Error as _;
assert!(err.source().is_some_and(|e| e.is::<MyError>()), "{err:?}");
Ok(())
}
// ... more details below ...
}
Define the error type:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[derive(Debug)]
pub enum MyError {
ExpectedProblem,
OhNoes,
}
impl std::error::Error for MyError {}
impl std::fmt::Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ExpectedProblem => write!(f, "this kind of thing happens"),
Self::OhNoes => write!(f, "oh noes! something terrible happened"),
}
}
}
#[derive(Debug, Default)]
struct MySource(u32);
impl google_cloud_storage::streaming_source::StreamingSource for MySource {
type Error = MyError;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0 += 1;
match self.0 {
42 => Some(Err(MyError::ExpectedProblem)),
n if n > 42 => None,
n => Some(Ok(bytes::Bytes::from_owner(format!(
"test data for the example {n}\n"
)))),
}
}
}
pub async fn attempt_upload(bucket_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let upload = client
.write_object(bucket_name, "expect-error", MySource::default())
.send_buffered()
.await;
println!("Upload result {upload:?}");
let err = upload.expect_err("the source is supposed to terminate the upload");
assert!(err.is_serialization(), "{err:?}");
use std::error::Error as _;
assert!(err.source().is_some_and(|e| e.is::<MyError>()), "{err:?}");
Ok(())
}
And implement the main function in this trait. Note how this function will (eventually) return the error type you defined above:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[derive(Debug)]
pub enum MyError {
ExpectedProblem,
OhNoes,
}
impl std::error::Error for MyError {}
impl std::fmt::Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ExpectedProblem => write!(f, "this kind of thing happens"),
Self::OhNoes => write!(f, "oh noes! something terrible happened"),
}
}
}
#[derive(Debug, Default)]
struct MySource(u32);
impl google_cloud_storage::streaming_source::StreamingSource for MySource {
type Error = MyError;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0 += 1;
match self.0 {
42 => Some(Err(MyError::ExpectedProblem)),
n if n > 42 => None,
n => Some(Ok(bytes::Bytes::from_owner(format!(
"test data for the example {n}\n"
)))),
}
}
}
pub async fn attempt_upload(bucket_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let upload = client
.write_object(bucket_name, "expect-error", MySource::default())
.send_buffered()
.await;
println!("Upload result {upload:?}");
let err = upload.expect_err("the source is supposed to terminate the upload");
assert!(err.is_serialization(), "{err:?}");
use std::error::Error as _;
assert!(err.source().is_some_and(|e| e.is::<MyError>()), "{err:?}");
Ok(())
}
Create the object
You will need a client to interact with Cloud Storage:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[derive(Debug)]
pub enum MyError {
ExpectedProblem,
OhNoes,
}
impl std::error::Error for MyError {}
impl std::fmt::Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ExpectedProblem => write!(f, "this kind of thing happens"),
Self::OhNoes => write!(f, "oh noes! something terrible happened"),
}
}
}
#[derive(Debug, Default)]
struct MySource(u32);
impl google_cloud_storage::streaming_source::StreamingSource for MySource {
type Error = MyError;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0 += 1;
match self.0 {
42 => Some(Err(MyError::ExpectedProblem)),
n if n > 42 => None,
n => Some(Ok(bytes::Bytes::from_owner(format!(
"test data for the example {n}\n"
)))),
}
}
}
pub async fn attempt_upload(bucket_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let upload = client
.write_object(bucket_name, "expect-error", MySource::default())
.send_buffered()
.await;
println!("Upload result {upload:?}");
let err = upload.expect_err("the source is supposed to terminate the upload");
assert!(err.is_serialization(), "{err:?}");
use std::error::Error as _;
assert!(err.source().is_some_and(|e| e.is::<MyError>()), "{err:?}");
Ok(())
}
Use the custom type to create the object:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[derive(Debug)]
pub enum MyError {
ExpectedProblem,
OhNoes,
}
impl std::error::Error for MyError {}
impl std::fmt::Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ExpectedProblem => write!(f, "this kind of thing happens"),
Self::OhNoes => write!(f, "oh noes! something terrible happened"),
}
}
}
#[derive(Debug, Default)]
struct MySource(u32);
impl google_cloud_storage::streaming_source::StreamingSource for MySource {
type Error = MyError;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0 += 1;
match self.0 {
42 => Some(Err(MyError::ExpectedProblem)),
n if n > 42 => None,
n => Some(Ok(bytes::Bytes::from_owner(format!(
"test data for the example {n}\n"
)))),
}
}
}
pub async fn attempt_upload(bucket_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let upload = client
.write_object(bucket_name, "expect-error", MySource::default())
.send_buffered()
.await;
println!("Upload result {upload:?}");
let err = upload.expect_err("the source is supposed to terminate the upload");
assert!(err.is_serialization(), "{err:?}");
use std::error::Error as _;
assert!(err.source().is_some_and(|e| e.is::<MyError>()), "{err:?}");
Ok(())
}
As expected, this object write fails. You can inspect the error details:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[derive(Debug)]
pub enum MyError {
ExpectedProblem,
OhNoes,
}
impl std::error::Error for MyError {}
impl std::fmt::Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ExpectedProblem => write!(f, "this kind of thing happens"),
Self::OhNoes => write!(f, "oh noes! something terrible happened"),
}
}
}
#[derive(Debug, Default)]
struct MySource(u32);
impl google_cloud_storage::streaming_source::StreamingSource for MySource {
type Error = MyError;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0 += 1;
match self.0 {
42 => Some(Err(MyError::ExpectedProblem)),
n if n > 42 => None,
n => Some(Ok(bytes::Bytes::from_owner(format!(
"test data for the example {n}\n"
)))),
}
}
}
pub async fn attempt_upload(bucket_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let upload = client
.write_object(bucket_name, "expect-error", MySource::default())
.send_buffered()
.await;
println!("Upload result {upload:?}");
let err = upload.expect_err("the source is supposed to terminate the upload");
assert!(err.is_serialization(), "{err:?}");
use std::error::Error as _;
assert!(err.source().is_some_and(|e| e.is::<MyError>()), "{err:?}");
Ok(())
}
Next Steps
Full Program
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[derive(Debug)]
pub enum MyError {
ExpectedProblem,
OhNoes,
}
impl std::error::Error for MyError {}
impl std::fmt::Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ExpectedProblem => write!(f, "this kind of thing happens"),
Self::OhNoes => write!(f, "oh noes! something terrible happened"),
}
}
}
#[derive(Debug, Default)]
struct MySource(u32);
impl google_cloud_storage::streaming_source::StreamingSource for MySource {
type Error = MyError;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0 += 1;
match self.0 {
42 => Some(Err(MyError::ExpectedProblem)),
n if n > 42 => None,
n => Some(Ok(bytes::Bytes::from_owner(format!(
"test data for the example {n}\n"
)))),
}
}
}
pub async fn attempt_upload(bucket_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let upload = client
.write_object(bucket_name, "expect-error", MySource::default())
.send_buffered()
.await;
println!("Upload result {upload:?}");
let err = upload.expect_err("the source is supposed to terminate the upload");
assert!(err.is_serialization(), "{err:?}");
use std::error::Error as _;
assert!(err.source().is_some_and(|e| e.is::<MyError>()), "{err:?}");
Ok(())
}