17 #include "google/cloud/spanner/internal/connection_impl.h" 18 #include "google/cloud/spanner/internal/retry_loop.h" 19 #include "google/cloud/spanner/internal/spanner_stub.h" 20 #include "google/cloud/spanner/internal/status_utils.h" 23 #include "google/cloud/internal/getenv.h" 25 #include <grpcpp/grpcpp.h> 34 std::vector<std::string> columns,
41 std::move(read_options),
46 std::string table,
KeySet keys,
47 std::vector<std::string> columns,
50 {internal::MakeSingleUseTransaction(std::move(transaction_options)),
54 std::move(read_options),
59 std::vector<std::string> columns,
61 return conn_->Read({std::move(transaction),
65 std::move(read_options),
70 return conn_->Read(internal::MakeReadParams(read_partition));
73 StatusOr<std::vector<ReadPartition>> Client::PartitionRead(
75 std::vector<std::string> columns,
ReadOptions read_options,
77 return conn_->PartitionRead({{std::move(transaction),
81 std::move(read_options),
88 return conn_->ExecuteQuery(
91 OverlayQueryOptions(opts),
98 return conn_->ExecuteQuery(
99 {internal::MakeSingleUseTransaction(std::move(transaction_options)),
100 std::move(statement),
101 OverlayQueryOptions(opts),
107 return conn_->ExecuteQuery({std::move(transaction),
108 std::move(statement),
109 OverlayQueryOptions(opts),
115 auto params = internal::MakeSqlParams(partition);
116 params.query_options = OverlayQueryOptions(opts);
117 return conn_->ExecuteQuery(std::move(params));
122 return conn_->ProfileQuery(
124 std::move(statement),
125 OverlayQueryOptions(opts),
132 return conn_->ProfileQuery(
133 {internal::MakeSingleUseTransaction(std::move(transaction_options)),
134 std::move(statement),
135 OverlayQueryOptions(opts),
142 return conn_->ProfileQuery({std::move(transaction),
143 std::move(statement),
144 OverlayQueryOptions(opts),
148 StatusOr<std::vector<QueryPartition>> Client::PartitionQuery(
151 return conn_->PartitionQuery(
152 {std::move(transaction), std::move(statement), partition_options});
158 return conn_->ExecuteDml({std::move(transaction),
159 std::move(statement),
160 OverlayQueryOptions(opts),
164 StatusOr<ProfileDmlResult> Client::ProfileDml(
Transaction transaction,
167 return conn_->ProfileDml({std::move(transaction),
168 std::move(statement),
169 OverlayQueryOptions(opts),
173 StatusOr<ExecutionPlan> Client::AnalyzeSql(
Transaction transaction,
176 return conn_->AnalyzeSql({std::move(transaction),
177 std::move(statement),
178 OverlayQueryOptions(opts),
182 StatusOr<BatchDmlResult> Client::ExecuteBatchDml(
183 Transaction transaction, std::vector<SqlStatement> statements) {
184 return conn_->ExecuteBatchDml(
185 {std::move(transaction), std::move(statements)});
188 StatusOr<CommitResult> Client::Commit(
189 std::function<StatusOr<Mutations>(
Transaction)>
const& mutator,
190 std::unique_ptr<TransactionRerunPolicy> rerun_policy,
191 std::unique_ptr<BackoffPolicy> backoff_policy) {
193 using RerunnablePolicy = internal::SafeTransactionRerun;
196 for (
int rerun = 0;; ++rerun) {
197 StatusOr<Mutations> mutations;
198 #if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS 201 mutations = mutator(txn);
202 #if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS 203 }
catch (RuntimeStatusError
const& error) {
205 Status status = error.status();
207 status = Status(StatusCode::kUnknown,
"OK Status thrown from mutator");
211 auto rb_status = Rollback(txn);
212 if (!RerunnablePolicy::IsOk(rb_status)) {
213 GCP_LOG(WARNING) <<
"Rollback() failure in Client::Commit(): " 214 << rb_status.message();
219 auto status = mutations.status();
220 if (RerunnablePolicy::IsOk(status)) {
221 auto result = Commit(txn, *mutations);
222 status = result.status();
223 if (!RerunnablePolicy::IsTransientFailure(status)) {
227 if (!RerunnablePolicy::IsTransientFailure(status)) {
228 auto rb_status = Rollback(txn);
229 if (!RerunnablePolicy::IsOk(rb_status)) {
230 GCP_LOG(WARNING) <<
"Rollback() failure in Client::Commit(): " 231 << rb_status.message();
237 if (!rerun_policy->OnFailure(status)) {
240 if (internal::IsSessionNotFound(status)) {
242 internal::Visit(txn, [](internal::SessionHolder& s,
243 google::spanner::v1::TransactionSelector
const&,
254 std::this_thread::sleep_for(backoff_policy->OnCompletion());
258 StatusOr<CommitResult> Client::Commit(
259 std::function<StatusOr<Mutations>(
Transaction)>
const& mutator) {
260 auto const rerun_maximum_duration = std::chrono::minutes(10);
261 auto default_commit_rerun_policy =
264 auto const backoff_initial_delay = std::chrono::milliseconds(100);
265 auto const backoff_maximum_delay = std::chrono::minutes(5);
266 auto const backoff_scaling = 2.0;
267 auto default_commit_backoff_policy =
272 return Commit(mutator, std::move(default_commit_rerun_policy),
273 std::move(default_commit_backoff_policy));
276 StatusOr<CommitResult> Client::Commit(
Mutations mutations) {
277 return Commit([&mutations](
Transaction const&) {
return mutations; });
282 return conn_->Commit({std::move(transaction), std::move(mutations)});
286 return conn_->Rollback({std::move(transaction)});
289 StatusOr<PartitionedDmlResult> Client::ExecutePartitionedDml(
291 return conn_->ExecutePartitionedDml({std::move(statement)});
302 static auto const*
const kOptimizerVersionEnvValue =
303 new auto(google::cloud::internal::GetEnv(
"SPANNER_OPTIMIZER_VERSION"));
313 }
else if (kOptimizerVersionEnvValue->has_value()) {
323 return MakeConnection(db, connection_options, std::move(session_pool_options),
324 internal::DefaultConnectionRetryPolicy(),
325 internal::DefaultConnectionBackoffPolicy());
331 std::unique_ptr<RetryPolicy> retry_policy,
332 std::unique_ptr<BackoffPolicy> backoff_policy) {
333 std::vector<std::shared_ptr<internal::SpannerStub>> stubs;
334 int num_channels = std::max(connection_options.num_channels(), 1);
335 stubs.reserve(num_channels);
336 for (
int channel_id = 0; channel_id < num_channels; ++channel_id) {
338 internal::CreateDefaultSpannerStub(connection_options, channel_id));
341 db, std::move(stubs), connection_options, std::move(session_pool_options),
342 std::move(retry_policy), std::move(backoff_policy));
optional< std::string > const & optimizer_version() const
Returns the optimizer version.
Options for "single-use", ReadOnly transactions, where Spanner chooses the read timestamp,...
Represents the stream of Rows and profile stats returned from spanner::Client::ProfileQuery().
std::vector< Mutation > Mutations
An ordered sequence of mutations to pass to Client::Commit() or return from the Client::Commit() muta...
Controls the session pool maintained by a spanner::Client.
The QueryPartition class is a regular type that represents a single slice of a parallel SQL read.
The KeySet class is a regular type that represents a collection of Keys.
google::cloud::internal::LimitedTimeRetryPolicy< google::cloud::Status, internal::SafeTransactionRerun > LimitedTimeTransactionRerunPolicy
A transaction rerun policy that limits the duration of the rerun loop.
Contains all the Cloud Spanner C++ client types and functions.
These QueryOptions allow users to configure features about how their SQL queries executes on the serv...
google::cloud::ConnectionOptions< ConnectionOptionsTraits > ConnectionOptions
The options for Cloud Spanner connections.
Transaction MakeReadWriteTransaction(Transaction::ReadWriteOptions opts={})
Create a read-write transaction configured with opts.
The ReadPartition class is a regular type that represents a single slice of a parallel Read operation...
google::cloud::internal::ExponentialBackoffPolicy ExponentialBackoffPolicy
A truncated exponential backoff policy with randomized periods.
#define SPANNER_CLIENT_NS
std::shared_ptr< Connection > MakeConnection(Database const &db, ConnectionOptions const &connection_options, SessionPoolOptions session_pool_options, std::unique_ptr< RetryPolicy > retry_policy, std::unique_ptr< BackoffPolicy > backoff_policy)
Returns a Connection object that can be used for interacting with Spanner.
QueryOptions & set_optimizer_version(optional< std::string > version)
Sets the optimizerion version to the specified integer string.
The representation of a Cloud Spanner transaction.
This class identifies a Cloud Spanner Database.
Represents the stream of Rows returned from spanner::Client::Read() or spanner::Client::ExecuteQuery(...
Options passed to Client::PartitionRead or Client::PartitionQuery.
Options for ReadOnly transactions.
Options passed to Client::Read or Client::PartitionRead.
Represents a potentially parameterized SQL statement.