Push data on object writes
The client API to write Cloud Storage objects pulls the payload from a type provided by the application. Some applications generate the payload in a thread and would rather "push" the object payload to the service.
This guide shows you how to write an object to Cloud Storage using a push data source.
Prerequisites
The guide assumes you have an existing Google Cloud project with billing enabled, and a Cloud Storage bucket in that project.
Add the client library as a dependency
cargo add google-cloud-storage
Convert a queue to a StreamingSource
The key idea is to use a queue to separate the task pushing new data from the task pulling the payload. This tutorial uses a Tokio mpsc queue, but you can use any queue that integrates with Tokio's async runtime.
First wrap the receiver in our own type:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
type Error = std::convert::Infallible;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0.recv().await.map(Ok)
}
}
pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
let upload = client
.write_object(bucket_name, object_name, QueueSource(receiver))
.send_buffered();
let task = tokio::spawn(upload);
for _ in 0..1000 {
let line = "I will not write funny examples in class\n";
sender
.send(bytes::Bytes::from_static(line.as_bytes()))
.await?;
}
drop(sender);
let object = task.await??;
println!("object successfully uploaded {object:?}");
Ok(())
}
Then implement the trait required by the Google Cloud client library:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
type Error = std::convert::Infallible;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0.recv().await.map(Ok)
}
}
pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
let upload = client
.write_object(bucket_name, object_name, QueueSource(receiver))
.send_buffered();
let task = tokio::spawn(upload);
for _ in 0..1000 {
let line = "I will not write funny examples in class\n";
sender
.send(bytes::Bytes::from_static(line.as_bytes()))
.await?;
}
drop(sender);
let object = task.await??;
println!("object successfully uploaded {object:?}");
Ok(())
}
In this tutorial you write the rest of the code in a function that accepts the bucket and object name as parameters:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
type Error = std::convert::Infallible;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0.recv().await.map(Ok)
}
}
pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
let upload = client
.write_object(bucket_name, object_name, QueueSource(receiver))
.send_buffered();
let task = tokio::spawn(upload);
for _ in 0..1000 {
let line = "I will not write funny examples in class\n";
sender
.send(bytes::Bytes::from_static(line.as_bytes()))
.await?;
}
drop(sender);
let object = task.await??;
println!("object successfully uploaded {object:?}");
Ok(())
}
// ... code goes here ...
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
type Error = std::convert::Infallible;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0.recv().await.map(Ok)
}
}
pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
let upload = client
.write_object(bucket_name, object_name, QueueSource(receiver))
.send_buffered();
let task = tokio::spawn(upload);
for _ in 0..1000 {
let line = "I will not write funny examples in class\n";
sender
.send(bytes::Bytes::from_static(line.as_bytes()))
.await?;
}
drop(sender);
let object = task.await??;
println!("object successfully uploaded {object:?}");
Ok(())
}
Initialize a client:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
type Error = std::convert::Infallible;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0.recv().await.map(Ok)
}
}
pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
let upload = client
.write_object(bucket_name, object_name, QueueSource(receiver))
.send_buffered();
let task = tokio::spawn(upload);
for _ in 0..1000 {
let line = "I will not write funny examples in class\n";
sender
.send(bytes::Bytes::from_static(line.as_bytes()))
.await?;
}
drop(sender);
let object = task.await??;
println!("object successfully uploaded {object:?}");
Ok(())
}
Create a queue, obtaining the receiver and sender:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
type Error = std::convert::Infallible;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0.recv().await.map(Ok)
}
}
pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
let upload = client
.write_object(bucket_name, object_name, QueueSource(receiver))
.send_buffered();
let task = tokio::spawn(upload);
for _ in 0..1000 {
let line = "I will not write funny examples in class\n";
sender
.send(bytes::Bytes::from_static(line.as_bytes()))
.await?;
}
drop(sender);
let object = task.await??;
println!("object successfully uploaded {object:?}");
Ok(())
}
Use the client to write an object with the data received from this queue. Note
that we do not await
the future created in the write_object()
method.
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
type Error = std::convert::Infallible;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0.recv().await.map(Ok)
}
}
pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
let upload = client
.write_object(bucket_name, object_name, QueueSource(receiver))
.send_buffered();
let task = tokio::spawn(upload);
for _ in 0..1000 {
let line = "I will not write funny examples in class\n";
sender
.send(bytes::Bytes::from_static(line.as_bytes()))
.await?;
}
drop(sender);
let object = task.await??;
println!("object successfully uploaded {object:?}");
Ok(())
}
Create a task to process the queue and write the data in the background:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
type Error = std::convert::Infallible;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0.recv().await.map(Ok)
}
}
pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
let upload = client
.write_object(bucket_name, object_name, QueueSource(receiver))
.send_buffered();
let task = tokio::spawn(upload);
for _ in 0..1000 {
let line = "I will not write funny examples in class\n";
sender
.send(bytes::Bytes::from_static(line.as_bytes()))
.await?;
}
drop(sender);
let object = task.await??;
println!("object successfully uploaded {object:?}");
Ok(())
}
In the main task, send some data to write:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
type Error = std::convert::Infallible;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0.recv().await.map(Ok)
}
}
pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
let upload = client
.write_object(bucket_name, object_name, QueueSource(receiver))
.send_buffered();
let task = tokio::spawn(upload);
for _ in 0..1000 {
let line = "I will not write funny examples in class\n";
sender
.send(bytes::Bytes::from_static(line.as_bytes()))
.await?;
}
drop(sender);
let object = task.await??;
println!("object successfully uploaded {object:?}");
Ok(())
}
Once you have finished sending the data, drop the sender to close the sending side of the queue:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
type Error = std::convert::Infallible;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0.recv().await.map(Ok)
}
}
pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
let upload = client
.write_object(bucket_name, object_name, QueueSource(receiver))
.send_buffered();
let task = tokio::spawn(upload);
for _ in 0..1000 {
let line = "I will not write funny examples in class\n";
sender
.send(bytes::Bytes::from_static(line.as_bytes()))
.await?;
}
drop(sender);
let object = task.await??;
println!("object successfully uploaded {object:?}");
Ok(())
}
Now you can wait for the task to finish and extract the result:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
type Error = std::convert::Infallible;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0.recv().await.map(Ok)
}
}
pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
let upload = client
.write_object(bucket_name, object_name, QueueSource(receiver))
.send_buffered();
let task = tokio::spawn(upload);
for _ in 0..1000 {
let line = "I will not write funny examples in class\n";
sender
.send(bytes::Bytes::from_static(line.as_bytes()))
.await?;
}
drop(sender);
let object = task.await??;
println!("object successfully uploaded {object:?}");
Ok(())
}
Full program
Putting all these steps together you get:
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use google_cloud_storage::streaming_source::StreamingSource;
use tokio::sync::mpsc::{self, Receiver};
#[derive(Debug)]
struct QueueSource(Receiver<bytes::Bytes>);
impl StreamingSource for QueueSource {
type Error = std::convert::Infallible;
async fn next(&mut self) -> Option<Result<bytes::Bytes, Self::Error>> {
self.0.recv().await.map(Ok)
}
}
pub async fn queue(bucket_name: &str, object_name: &str) -> anyhow::Result<()> {
use google_cloud_storage::client::Storage;
let client = Storage::builder().build().await?;
let (sender, receiver) = mpsc::channel::<bytes::Bytes>(32);
let upload = client
.write_object(bucket_name, object_name, QueueSource(receiver))
.send_buffered();
let task = tokio::spawn(upload);
for _ in 0..1000 {
let line = "I will not write funny examples in class\n";
sender
.send(bytes::Bytes::from_static(line.as_bytes()))
.await?;
}
drop(sender);
let object = task.await??;
println!("object successfully uploaded {object:?}");
Ok(())
}