Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Push data on object writes

The client API to write Cloud Storage objects pulls the payload from a type provided by the application. Some applications generate the payload in a thread and would rather "push" the object payload to the service.

This guide shows you how to write an object to Cloud Storage using a push data source.

Prerequisites

The guide assumes you have an existing Google Cloud project with billing enabled, and a Cloud Storage bucket in that project.

Add the client library as a dependency

cargo add google-cloud-storage

Convert a queue to a StreamingSource

The key idea is to use a queue to separate the task pushing new data from the task pulling the payload. This tutorial uses a Tokio mpsc queue, but you can use any queue that integrates with Tokio's async runtime.

First wrap the receiver in our own type:

// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
    type Error = std::convert::Infallible;
    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
        self.0.recv().await.map(Ok)
    }
}

pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
    use google_cloud_storage::client::Storage;
    let client = Storage::builder().build().await?;

    let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
    let upload = client
        .write_object(bucket_name, object_name, QueueSource(receiver))
        .send_buffered();
    let task = tokio::spawn(upload);

    for _ in 0..1000 {
        let line = "I will not write funny examples in class\n";
        sender
            .send(bytes::Bytes::from_static(line.as_bytes()))
            .await?;
    }
    drop(sender);
    let object = task.await??;
    println!("object successfully uploaded {object:?}");

    Ok(())
}

Then implement the trait required by the Google Cloud client library:

// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
    type Error = std::convert::Infallible;
    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
        self.0.recv().await.map(Ok)
    }
}

pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
    use google_cloud_storage::client::Storage;
    let client = Storage::builder().build().await?;

    let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
    let upload = client
        .write_object(bucket_name, object_name, QueueSource(receiver))
        .send_buffered();
    let task = tokio::spawn(upload);

    for _ in 0..1000 {
        let line = "I will not write funny examples in class\n";
        sender
            .send(bytes::Bytes::from_static(line.as_bytes()))
            .await?;
    }
    drop(sender);
    let object = task.await??;
    println!("object successfully uploaded {object:?}");

    Ok(())
}

In this tutorial you write the rest of the code in a function that accepts the bucket and object name as parameters:

// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
    type Error = std::convert::Infallible;
    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
        self.0.recv().await.map(Ok)
    }
}

pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
    use google_cloud_storage::client::Storage;
    let client = Storage::builder().build().await?;

    let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
    let upload = client
        .write_object(bucket_name, object_name, QueueSource(receiver))
        .send_buffered();
    let task = tokio::spawn(upload);

    for _ in 0..1000 {
        let line = "I will not write funny examples in class\n";
        sender
            .send(bytes::Bytes::from_static(line.as_bytes()))
            .await?;
    }
    drop(sender);
    let object = task.await??;
    println!("object successfully uploaded {object:?}");

    Ok(())
}
    // ... code goes here ...
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
    type Error = std::convert::Infallible;
    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
        self.0.recv().await.map(Ok)
    }
}

pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
    use google_cloud_storage::client::Storage;
    let client = Storage::builder().build().await?;

    let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
    let upload = client
        .write_object(bucket_name, object_name, QueueSource(receiver))
        .send_buffered();
    let task = tokio::spawn(upload);

    for _ in 0..1000 {
        let line = "I will not write funny examples in class\n";
        sender
            .send(bytes::Bytes::from_static(line.as_bytes()))
            .await?;
    }
    drop(sender);
    let object = task.await??;
    println!("object successfully uploaded {object:?}");

    Ok(())
}

Initialize a client:

// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
    type Error = std::convert::Infallible;
    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
        self.0.recv().await.map(Ok)
    }
}

pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
    use google_cloud_storage::client::Storage;
    let client = Storage::builder().build().await?;

    let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
    let upload = client
        .write_object(bucket_name, object_name, QueueSource(receiver))
        .send_buffered();
    let task = tokio::spawn(upload);

    for _ in 0..1000 {
        let line = "I will not write funny examples in class\n";
        sender
            .send(bytes::Bytes::from_static(line.as_bytes()))
            .await?;
    }
    drop(sender);
    let object = task.await??;
    println!("object successfully uploaded {object:?}");

    Ok(())
}

Create a queue, obtaining the receiver and sender:

// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
    type Error = std::convert::Infallible;
    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
        self.0.recv().await.map(Ok)
    }
}

pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
    use google_cloud_storage::client::Storage;
    let client = Storage::builder().build().await?;

    let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
    let upload = client
        .write_object(bucket_name, object_name, QueueSource(receiver))
        .send_buffered();
    let task = tokio::spawn(upload);

    for _ in 0..1000 {
        let line = "I will not write funny examples in class\n";
        sender
            .send(bytes::Bytes::from_static(line.as_bytes()))
            .await?;
    }
    drop(sender);
    let object = task.await??;
    println!("object successfully uploaded {object:?}");

    Ok(())
}

Use the client to write an object with the data received from this queue. Note that we do not await the future created in the write_object() method.

// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
    type Error = std::convert::Infallible;
    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
        self.0.recv().await.map(Ok)
    }
}

pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
    use google_cloud_storage::client::Storage;
    let client = Storage::builder().build().await?;

    let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
    let upload = client
        .write_object(bucket_name, object_name, QueueSource(receiver))
        .send_buffered();
    let task = tokio::spawn(upload);

    for _ in 0..1000 {
        let line = "I will not write funny examples in class\n";
        sender
            .send(bytes::Bytes::from_static(line.as_bytes()))
            .await?;
    }
    drop(sender);
    let object = task.await??;
    println!("object successfully uploaded {object:?}");

    Ok(())
}

Create a task to process the queue and write the data in the background:

// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
    type Error = std::convert::Infallible;
    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
        self.0.recv().await.map(Ok)
    }
}

pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
    use google_cloud_storage::client::Storage;
    let client = Storage::builder().build().await?;

    let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
    let upload = client
        .write_object(bucket_name, object_name, QueueSource(receiver))
        .send_buffered();
    let task = tokio::spawn(upload);

    for _ in 0..1000 {
        let line = "I will not write funny examples in class\n";
        sender
            .send(bytes::Bytes::from_static(line.as_bytes()))
            .await?;
    }
    drop(sender);
    let object = task.await??;
    println!("object successfully uploaded {object:?}");

    Ok(())
}

In the main task, send some data to write:

// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
    type Error = std::convert::Infallible;
    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
        self.0.recv().await.map(Ok)
    }
}

pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
    use google_cloud_storage::client::Storage;
    let client = Storage::builder().build().await?;

    let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
    let upload = client
        .write_object(bucket_name, object_name, QueueSource(receiver))
        .send_buffered();
    let task = tokio::spawn(upload);

    for _ in 0..1000 {
        let line = "I will not write funny examples in class\n";
        sender
            .send(bytes::Bytes::from_static(line.as_bytes()))
            .await?;
    }
    drop(sender);
    let object = task.await??;
    println!("object successfully uploaded {object:?}");

    Ok(())
}

Once you have finished sending the data, drop the sender to close the sending side of the queue:

// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
    type Error = std::convert::Infallible;
    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
        self.0.recv().await.map(Ok)
    }
}

pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
    use google_cloud_storage::client::Storage;
    let client = Storage::builder().build().await?;

    let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
    let upload = client
        .write_object(bucket_name, object_name, QueueSource(receiver))
        .send_buffered();
    let task = tokio::spawn(upload);

    for _ in 0..1000 {
        let line = "I will not write funny examples in class\n";
        sender
            .send(bytes::Bytes::from_static(line.as_bytes()))
            .await?;
    }
    drop(sender);
    let object = task.await??;
    println!("object successfully uploaded {object:?}");

    Ok(())
}

Now you can wait for the task to finish and extract the result:

// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
    type Error = std::convert::Infallible;
    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
        self.0.recv().await.map(Ok)
    }
}

pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
    use google_cloud_storage::client::Storage;
    let client = Storage::builder().build().await?;

    let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
    let upload = client
        .write_object(bucket_name, object_name, QueueSource(receiver))
        .send_buffered();
    let task = tokio::spawn(upload);

    for _ in 0..1000 {
        let line = "I will not write funny examples in class\n";
        sender
            .send(bytes::Bytes::from_static(line.as_bytes()))
            .await?;
    }
    drop(sender);
    let object = task.await??;
    println!("object successfully uploaded {object:?}");

    Ok(())
}

Full program

Putting all these steps together you get:

// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
    type Error = std::convert::Infallible;
    async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
        self.0.recv().await.map(Ok)
    }
}

pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
    use google_cloud_storage::client::Storage;
    let client = Storage::builder().build().await?;

    let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
    let upload = client
        .write_object(bucket_name, object_name, QueueSource(receiver))
        .send_buffered();
    let task = tokio::spawn(upload);

    for _ in 0..1000 {
        let line = "I will not write funny examples in class\n";
        sender
            .send(bytes::Bytes::from_static(line.as_bytes()))
            .await?;
    }
    drop(sender);
    let object = task.await??;
    println!("object successfully uploaded {object:?}");

    Ok(())
}