Google Cloud Spanner C++ Client
A C++ Client Library for Google Cloud Spanner
client.cc
Go to the documentation of this file.
1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
17 #include "google/cloud/spanner/internal/connection_impl.h"
18 #include "google/cloud/spanner/internal/retry_loop.h"
19 #include "google/cloud/spanner/internal/spanner_stub.h"
20 #include "google/cloud/spanner/internal/status_utils.h"
23 #include "google/cloud/internal/getenv.h"
24 #include "google/cloud/log.h"
25 #include <grpcpp/grpcpp.h>
26 #include <thread>
27 
28 namespace google {
29 namespace cloud {
30 namespace spanner {
31 inline namespace SPANNER_CLIENT_NS {
32 
33 RowStream Client::Read(std::string table, KeySet keys,
34  std::vector<std::string> columns,
35  ReadOptions read_options) {
36  return conn_->Read(
37  {internal::MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
38  std::move(table),
39  std::move(keys),
40  std::move(columns),
41  std::move(read_options),
42  {}});
43 }
44 
45 RowStream Client::Read(Transaction::SingleUseOptions transaction_options,
46  std::string table, KeySet keys,
47  std::vector<std::string> columns,
48  ReadOptions read_options) {
49  return conn_->Read(
50  {internal::MakeSingleUseTransaction(std::move(transaction_options)),
51  std::move(table),
52  std::move(keys),
53  std::move(columns),
54  std::move(read_options),
55  {}});
56 }
57 
58 RowStream Client::Read(Transaction transaction, std::string table, KeySet keys,
59  std::vector<std::string> columns,
60  ReadOptions read_options) {
61  return conn_->Read({std::move(transaction),
62  std::move(table),
63  std::move(keys),
64  std::move(columns),
65  std::move(read_options),
66  {}});
67 }
68 
69 RowStream Client::Read(ReadPartition const& read_partition) {
70  return conn_->Read(internal::MakeReadParams(read_partition));
71 }
72 
73 StatusOr<std::vector<ReadPartition>> Client::PartitionRead(
74  Transaction transaction, std::string table, KeySet keys,
75  std::vector<std::string> columns, ReadOptions read_options,
76  PartitionOptions const& partition_options) {
77  return conn_->PartitionRead({{std::move(transaction),
78  std::move(table),
79  std::move(keys),
80  std::move(columns),
81  std::move(read_options),
82  {}},
83  partition_options});
84 }
85 
86 RowStream Client::ExecuteQuery(SqlStatement statement,
87  QueryOptions const& opts) {
88  return conn_->ExecuteQuery(
89  {internal::MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
90  std::move(statement),
91  OverlayQueryOptions(opts),
92  {}});
93 }
94 
95 RowStream Client::ExecuteQuery(
96  Transaction::SingleUseOptions transaction_options, SqlStatement statement,
97  QueryOptions const& opts) {
98  return conn_->ExecuteQuery(
99  {internal::MakeSingleUseTransaction(std::move(transaction_options)),
100  std::move(statement),
101  OverlayQueryOptions(opts),
102  {}});
103 }
104 
105 RowStream Client::ExecuteQuery(Transaction transaction, SqlStatement statement,
106  QueryOptions const& opts) {
107  return conn_->ExecuteQuery({std::move(transaction),
108  std::move(statement),
109  OverlayQueryOptions(opts),
110  {}});
111 }
112 
113 RowStream Client::ExecuteQuery(QueryPartition const& partition,
114  QueryOptions const& opts) {
115  auto params = internal::MakeSqlParams(partition);
116  params.query_options = OverlayQueryOptions(opts);
117  return conn_->ExecuteQuery(std::move(params));
118 }
119 
120 ProfileQueryResult Client::ProfileQuery(SqlStatement statement,
121  QueryOptions const& opts) {
122  return conn_->ProfileQuery(
123  {internal::MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
124  std::move(statement),
125  OverlayQueryOptions(opts),
126  {}});
127 }
128 
129 ProfileQueryResult Client::ProfileQuery(
130  Transaction::SingleUseOptions transaction_options, SqlStatement statement,
131  QueryOptions const& opts) {
132  return conn_->ProfileQuery(
133  {internal::MakeSingleUseTransaction(std::move(transaction_options)),
134  std::move(statement),
135  OverlayQueryOptions(opts),
136  {}});
137 }
138 
139 ProfileQueryResult Client::ProfileQuery(Transaction transaction,
140  SqlStatement statement,
141  QueryOptions const& opts) {
142  return conn_->ProfileQuery({std::move(transaction),
143  std::move(statement),
144  OverlayQueryOptions(opts),
145  {}});
146 }
147 
148 StatusOr<std::vector<QueryPartition>> Client::PartitionQuery(
149  Transaction transaction, SqlStatement statement,
150  PartitionOptions const& partition_options) {
151  return conn_->PartitionQuery(
152  {std::move(transaction), std::move(statement), partition_options});
153 }
154 
155 StatusOr<DmlResult> Client::ExecuteDml(Transaction transaction,
156  SqlStatement statement,
157  QueryOptions const& opts) {
158  return conn_->ExecuteDml({std::move(transaction),
159  std::move(statement),
160  OverlayQueryOptions(opts),
161  {}});
162 }
163 
164 StatusOr<ProfileDmlResult> Client::ProfileDml(Transaction transaction,
165  SqlStatement statement,
166  QueryOptions const& opts) {
167  return conn_->ProfileDml({std::move(transaction),
168  std::move(statement),
169  OverlayQueryOptions(opts),
170  {}});
171 }
172 
173 StatusOr<ExecutionPlan> Client::AnalyzeSql(Transaction transaction,
174  SqlStatement statement,
175  QueryOptions const& opts) {
176  return conn_->AnalyzeSql({std::move(transaction),
177  std::move(statement),
178  OverlayQueryOptions(opts),
179  {}});
180 }
181 
182 StatusOr<BatchDmlResult> Client::ExecuteBatchDml(
183  Transaction transaction, std::vector<SqlStatement> statements) {
184  return conn_->ExecuteBatchDml(
185  {std::move(transaction), std::move(statements)});
186 }
187 
188 StatusOr<CommitResult> Client::Commit(
189  std::function<StatusOr<Mutations>(Transaction)> const& mutator,
190  std::unique_ptr<TransactionRerunPolicy> rerun_policy,
191  std::unique_ptr<BackoffPolicy> backoff_policy) {
192  // The status-code discriminator of TransactionRerunPolicy.
193  using RerunnablePolicy = internal::SafeTransactionRerun;
194 
196  for (int rerun = 0;; ++rerun) {
197  StatusOr<Mutations> mutations;
198 #if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS
199  try {
200 #endif
201  mutations = mutator(txn);
202 #if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS
203  } catch (RuntimeStatusError const& error) {
204  // Treat this like mutator() returned a bad Status.
205  Status status = error.status();
206  if (status.ok()) {
207  status = Status(StatusCode::kUnknown, "OK Status thrown from mutator");
208  }
209  mutations = status;
210  } catch (...) {
211  auto rb_status = Rollback(txn);
212  if (!RerunnablePolicy::IsOk(rb_status)) {
213  GCP_LOG(WARNING) << "Rollback() failure in Client::Commit(): "
214  << rb_status.message();
215  }
216  throw;
217  }
218 #endif
219  auto status = mutations.status();
220  if (RerunnablePolicy::IsOk(status)) {
221  auto result = Commit(txn, *mutations);
222  status = result.status();
223  if (!RerunnablePolicy::IsTransientFailure(status)) {
224  return result;
225  }
226  } else {
227  if (!RerunnablePolicy::IsTransientFailure(status)) {
228  auto rb_status = Rollback(txn);
229  if (!RerunnablePolicy::IsOk(rb_status)) {
230  GCP_LOG(WARNING) << "Rollback() failure in Client::Commit(): "
231  << rb_status.message();
232  }
233  return status;
234  }
235  }
236  // A transient failure (e.g., kAborted), so consider rerunning.
237  if (!rerun_policy->OnFailure(status)) {
238  return status; // reruns exhausted
239  }
240  if (internal::IsSessionNotFound(status)) {
241  // Marks the session bad and creates a new Transaction for the next loop.
242  internal::Visit(txn, [](internal::SessionHolder& s,
243  google::spanner::v1::TransactionSelector const&,
244  std::int64_t) {
245  if (s) s->set_bad();
246  return true;
247  });
248  txn = MakeReadWriteTransaction();
249  } else {
250  // Create a new transaction for the next loop, but reuse the session
251  // so that we have a slightly better chance of avoiding another abort.
252  txn = MakeReadWriteTransaction(txn);
253  }
254  std::this_thread::sleep_for(backoff_policy->OnCompletion());
255  }
256 }
257 
258 StatusOr<CommitResult> Client::Commit(
259  std::function<StatusOr<Mutations>(Transaction)> const& mutator) {
260  auto const rerun_maximum_duration = std::chrono::minutes(10);
261  auto default_commit_rerun_policy =
262  LimitedTimeTransactionRerunPolicy(rerun_maximum_duration).clone();
263 
264  auto const backoff_initial_delay = std::chrono::milliseconds(100);
265  auto const backoff_maximum_delay = std::chrono::minutes(5);
266  auto const backoff_scaling = 2.0;
267  auto default_commit_backoff_policy =
268  ExponentialBackoffPolicy(backoff_initial_delay, backoff_maximum_delay,
269  backoff_scaling)
270  .clone();
271 
272  return Commit(mutator, std::move(default_commit_rerun_policy),
273  std::move(default_commit_backoff_policy));
274 }
275 
276 StatusOr<CommitResult> Client::Commit(Mutations mutations) {
277  return Commit([&mutations](Transaction const&) { return mutations; });
278 }
279 
280 StatusOr<CommitResult> Client::Commit(Transaction transaction,
281  Mutations mutations) {
282  return conn_->Commit({std::move(transaction), std::move(mutations)});
283 }
284 
285 Status Client::Rollback(Transaction transaction) {
286  return conn_->Rollback({std::move(transaction)});
287 }
288 
289 StatusOr<PartitionedDmlResult> Client::ExecutePartitionedDml(
290  SqlStatement statement) {
291  return conn_->ExecutePartitionedDml({std::move(statement)});
292 }
293 
294 // Returns a QueryOptions struct that has each field set according to the
295 // hierarchy that options specified as to the function call (i.e., `preferred`)
296 // are preferred, followed by options set at the Client level, followed by an
297 // environment variable. If none are set, the field's optional will be unset
298 // and nothing will be included in the proto sent to Spanner, in which case,
299 // the Database default will be used.
300 QueryOptions Client::OverlayQueryOptions(QueryOptions const& preferred) {
301  // GetEnv() is not super fast, so we look it up once and cache it.
302  static auto const* const kOptimizerVersionEnvValue =
303  new auto(google::cloud::internal::GetEnv("SPANNER_OPTIMIZER_VERSION"));
304 
305  QueryOptions const& fallback = opts_.query_options();
306  QueryOptions opts;
307 
308  // Choose the `optimizer_version` option.
309  if (preferred.optimizer_version().has_value()) {
310  opts.set_optimizer_version(preferred.optimizer_version());
311  } else if (fallback.optimizer_version().has_value()) {
312  opts.set_optimizer_version(fallback.optimizer_version());
313  } else if (kOptimizerVersionEnvValue->has_value()) {
314  opts.set_optimizer_version(*kOptimizerVersionEnvValue);
315  }
316 
317  return opts;
318 }
319 
320 std::shared_ptr<Connection> MakeConnection(
321  Database const& db, ConnectionOptions const& connection_options,
322  SessionPoolOptions session_pool_options) {
323  return MakeConnection(db, connection_options, std::move(session_pool_options),
324  internal::DefaultConnectionRetryPolicy(),
325  internal::DefaultConnectionBackoffPolicy());
326 }
327 
328 std::shared_ptr<Connection> MakeConnection(
329  Database const& db, ConnectionOptions const& connection_options,
330  SessionPoolOptions session_pool_options,
331  std::unique_ptr<RetryPolicy> retry_policy,
332  std::unique_ptr<BackoffPolicy> backoff_policy) {
333  std::vector<std::shared_ptr<internal::SpannerStub>> stubs;
334  int num_channels = std::max(connection_options.num_channels(), 1);
335  stubs.reserve(num_channels);
336  for (int channel_id = 0; channel_id < num_channels; ++channel_id) {
337  stubs.push_back(
338  internal::CreateDefaultSpannerStub(connection_options, channel_id));
339  }
341  db, std::move(stubs), connection_options, std::move(session_pool_options),
342  std::move(retry_policy), std::move(backoff_policy));
343 }
344 
345 } // namespace SPANNER_CLIENT_NS
346 } // namespace spanner
347 } // namespace cloud
348 } // namespace google
optional< std::string > const & optimizer_version() const
Returns the optimizer version.
Definition: query_options.h:40
Options for "single-use", ReadOnly transactions, where Spanner chooses the read timestamp,...
Definition: transaction.h:109
Represents the stream of Rows and profile stats returned from spanner::Client::ProfileQuery().
Definition: results.h:150
std::vector< Mutation > Mutations
An ordered sequence of mutations to pass to Client::Commit() or return from the Client::Commit() muta...
Definition: mutations.h:95
Controls the session pool maintained by a spanner::Client.
The QueryPartition class is a regular type that represents a single slice of a parallel SQL read.
The KeySet class is a regular type that represents a collection of Keys.
Definition: keys.h:158
google::cloud::internal::LimitedTimeRetryPolicy< google::cloud::Status, internal::SafeTransactionRerun > LimitedTimeTransactionRerunPolicy
A transaction rerun policy that limits the duration of the rerun loop.
Definition: retry_policy.h:80
Contains all the Cloud Spanner C++ client types and functions.
These QueryOptions allow users to configure features about how their SQL queries executes on the serv...
Definition: query_options.h:31
google::cloud::ConnectionOptions< ConnectionOptionsTraits > ConnectionOptions
The options for Cloud Spanner connections.
Transaction MakeReadWriteTransaction(Transaction::ReadWriteOptions opts={})
Create a read-write transaction configured with opts.
Definition: transaction.h:196
The ReadPartition class is a regular type that represents a single slice of a parallel Read operation...
google::cloud::internal::ExponentialBackoffPolicy ExponentialBackoffPolicy
A truncated exponential backoff policy with randomized periods.
#define SPANNER_CLIENT_NS
Definition: version.h:22
#define GCP_LOG(level)
std::shared_ptr< Connection > MakeConnection(Database const &db, ConnectionOptions const &connection_options, SessionPoolOptions session_pool_options, std::unique_ptr< RetryPolicy > retry_policy, std::unique_ptr< BackoffPolicy > backoff_policy)
Returns a Connection object that can be used for interacting with Spanner.
Definition: client.cc:328
QueryOptions & set_optimizer_version(optional< std::string > version)
Sets the optimizerion version to the specified integer string.
Definition: query_options.h:49
The representation of a Cloud Spanner transaction.
Definition: transaction.h:63
This class identifies a Cloud Spanner Database.
Definition: database.h:43
Represents the stream of Rows returned from spanner::Client::Read() or spanner::Client::ExecuteQuery(...
Definition: results.h:68
Options passed to Client::PartitionRead or Client::PartitionQuery.
Options passed to Client::Read or Client::PartitionRead.
Definition: read_options.h:28
Represents a potentially parameterized SQL statement.
Definition: sql_statement.h:55