Google Cloud Bigtable C++ Client  1.17.0
A C++ Client Library for Google Cloud Bigtable
table.cc
Go to the documentation of this file.
1 // Copyright 2017 Google Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 #include "google/cloud/bigtable/internal/async_bulk_apply.h"
17 #include "google/cloud/bigtable/internal/bulk_mutator.h"
18 #include "google/cloud/bigtable/internal/unary_client_utils.h"
20 #include "google/cloud/internal/async_retry_unary_rpc.h"
21 #include <thread>
22 #include <type_traits>
23 
24 namespace btproto = ::google::bigtable::v2;
25 namespace google {
26 namespace cloud {
27 namespace bigtable {
28 inline namespace BIGTABLE_CLIENT_NS {
29 namespace {
30 template <typename Request>
31 void SetCommonTableOperationRequest(Request& request,
32  std::string const& app_profile_id,
33  std::string const& table_name) {
34  request.set_app_profile_id(app_profile_id);
35  request.set_table_name(table_name);
36 }
37 
38 template <typename Response>
39 Row TransformReadModifyWriteRowResponse(Response& response) {
40  std::vector<bigtable::Cell> cells;
41  auto& row = *response.mutable_row();
42  for (auto& family : *row.mutable_families()) {
43  for (auto& column : *family.mutable_columns()) {
44  for (auto& cell : *column.mutable_cells()) {
45  std::vector<std::string> labels;
46  std::move(cell.mutable_labels()->begin(), cell.mutable_labels()->end(),
47  std::back_inserter(labels));
48  bigtable::Cell new_cell(row.key(), family.name(), column.qualifier(),
49  cell.timestamp_micros(),
50  std::move(*cell.mutable_value()),
51  std::move(labels));
52 
53  cells.emplace_back(std::move(new_cell));
54  }
55  }
56  }
57 
58  return Row(std::move(*row.mutable_key()), std::move(cells));
59 }
60 
61 } // namespace
62 
63 using ClientUtils = bigtable::internal::UnaryClientUtils<DataClient>;
64 
65 static_assert(std::is_copy_assignable<bigtable::Table>::value,
66  "bigtable::Table must be CopyAssignable");
67 
68 Status Table::Apply(SingleRowMutation mut) {
69  // Copy the policies in effect for this operation. Many policy classes change
70  // their state as the operation makes progress (or fails to make progress), so
71  // we need fresh instances.
72  auto rpc_policy = clone_rpc_retry_policy();
73  auto backoff_policy = clone_rpc_backoff_policy();
74  auto idempotent_policy = clone_idempotent_mutation_policy();
75 
76  // Build the RPC request, try to minimize copying.
77  btproto::MutateRowRequest request;
78  SetCommonTableOperationRequest<btproto::MutateRowRequest>(
79  request, app_profile_id_, table_name_);
80  mut.MoveTo(request);
81 
82  bool const is_idempotent =
83  std::all_of(request.mutations().begin(), request.mutations().end(),
84  [&idempotent_policy](btproto::Mutation const& m) {
85  return idempotent_policy->is_idempotent(m);
86  });
87 
88  btproto::MutateRowResponse response;
89  grpc::Status status;
90  while (true) {
91  grpc::ClientContext client_context;
92  rpc_policy->Setup(client_context);
93  backoff_policy->Setup(client_context);
94  metadata_update_policy_.Setup(client_context);
95  status = client_->MutateRow(&client_context, request, &response);
96 
97  if (status.ok()) {
98  return google::cloud::Status{};
99  }
100  // It is up to the policy to terminate this loop, it could run
101  // forever, but that would be a bad policy (pun intended).
102  if (!rpc_policy->OnFailure(status) || !is_idempotent) {
103  return MakeStatusFromRpcError(status);
104  }
105  auto delay = backoff_policy->OnCompletion(status);
106  std::this_thread::sleep_for(delay);
107  }
108 }
109 
110 future<Status> Table::AsyncApply(SingleRowMutation mut, CompletionQueue& cq) {
111  google::bigtable::v2::MutateRowRequest request;
112  SetCommonTableOperationRequest<google::bigtable::v2::MutateRowRequest>(
113  request, app_profile_id_, table_name_);
114  mut.MoveTo(request);
115  auto context = absl::make_unique<grpc::ClientContext>();
116 
117  // Determine if all the mutations are idempotent. The idempotency of the
118  // mutations won't change as the retry loop executes, so we can just compute
119  // it once and use a constant value for the loop.
120  auto idempotent_mutation_policy = clone_idempotent_mutation_policy();
121  bool const is_idempotent = std::all_of(
122  request.mutations().begin(), request.mutations().end(),
123  [&idempotent_mutation_policy](google::bigtable::v2::Mutation const& m) {
124  return idempotent_mutation_policy->is_idempotent(m);
125  });
126 
127  auto client = client_;
128  auto metadata_update_policy = clone_metadata_update_policy();
129  return google::cloud::internal::StartRetryAsyncUnaryRpc(
130  cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
131  is_idempotent,
132  [client, metadata_update_policy](
133  grpc::ClientContext* context,
134  google::bigtable::v2::MutateRowRequest const& request,
135  grpc::CompletionQueue* cq) {
136  metadata_update_policy.Setup(*context);
137  return client->AsyncMutateRow(context, request, cq);
138  },
139  std::move(request))
140  .then([](future<StatusOr<google::bigtable::v2::MutateRowResponse>> r) {
141  return r.get().status();
142  });
143 }
144 
145 std::vector<FailedMutation> Table::BulkApply(BulkMutation mut) {
146  grpc::Status status;
147 
148  // Copy the policies in effect for this operation. Many policy classes change
149  // their state as the operation makes progress (or fails to make progress), so
150  // we need fresh instances.
151  auto backoff_policy = clone_rpc_backoff_policy();
152  auto retry_policy = clone_rpc_retry_policy();
153  auto idemponent_policy = clone_idempotent_mutation_policy();
154 
155  bigtable::internal::BulkMutator mutator(app_profile_id_, table_name_,
156  *idemponent_policy, std::move(mut));
157  while (mutator.HasPendingMutations()) {
158  grpc::ClientContext client_context;
159  backoff_policy->Setup(client_context);
160  retry_policy->Setup(client_context);
161  metadata_update_policy_.Setup(client_context);
162  status = mutator.MakeOneRequest(*client_, client_context);
163  if (!status.ok() && !retry_policy->OnFailure(status)) {
164  break;
165  }
166  auto delay = backoff_policy->OnCompletion(status);
167  std::this_thread::sleep_for(delay);
168  }
169  return std::move(mutator).OnRetryDone();
170 }
171 
172 future<std::vector<FailedMutation>> Table::AsyncBulkApply(BulkMutation mut,
173  CompletionQueue& cq) {
174  auto mutation_policy = clone_idempotent_mutation_policy();
175  return internal::AsyncRetryBulkApply::Create(
176  cq, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
177  *mutation_policy, clone_metadata_update_policy(), client_,
178  app_profile_id_, table_name(), std::move(mut));
179 }
180 
181 RowReader Table::ReadRows(RowSet row_set, Filter filter) {
182  return RowReader(
183  client_, app_profile_id_, table_name_, std::move(row_set),
184  RowReader::NO_ROWS_LIMIT, std::move(filter), clone_rpc_retry_policy(),
185  clone_rpc_backoff_policy(), metadata_update_policy_,
186  absl::make_unique<bigtable::internal::ReadRowsParserFactory>());
187 }
188 
189 RowReader Table::ReadRows(RowSet row_set, std::int64_t rows_limit,
190  Filter filter) {
191  return RowReader(
192  client_, app_profile_id_, table_name_, std::move(row_set), rows_limit,
193  std::move(filter), clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
194  metadata_update_policy_,
195  absl::make_unique<bigtable::internal::ReadRowsParserFactory>());
196 }
197 
198 StatusOr<std::pair<bool, Row>> Table::ReadRow(std::string row_key,
199  Filter filter) {
200  RowSet row_set(std::move(row_key));
201  std::int64_t const rows_limit = 1;
202  RowReader reader =
203  ReadRows(std::move(row_set), rows_limit, std::move(filter));
204 
205  auto it = reader.begin();
206  if (it == reader.end()) {
207  return std::make_pair(false, Row("", {}));
208  }
209  if (!*it) {
210  return it->status();
211  }
212  auto result = std::make_pair(true, std::move(**it));
213  if (++it != reader.end()) {
214  return Status(StatusCode::kInternal,
215  "internal error - RowReader returned 2 rows in ReadRow()");
216  }
217  return result;
218 }
219 
220 StatusOr<MutationBranch> Table::CheckAndMutateRow(
221  std::string row_key, Filter filter, std::vector<Mutation> true_mutations,
222  std::vector<Mutation> false_mutations) {
223  grpc::Status status;
224  btproto::CheckAndMutateRowRequest request;
225  request.set_row_key(std::move(row_key));
226  SetCommonTableOperationRequest<btproto::CheckAndMutateRowRequest>(
227  request, app_profile_id_, table_name_);
228  *request.mutable_predicate_filter() = std::move(filter).as_proto();
229  for (auto& m : true_mutations) {
230  *request.add_true_mutations() = std::move(m.op);
231  }
232  for (auto& m : false_mutations) {
233  *request.add_false_mutations() = std::move(m.op);
234  }
235  bool const is_idempotent =
236  idempotent_mutation_policy_->is_idempotent(request);
237  auto response = ClientUtils::MakeCall(
238  *client_, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
239  metadata_update_policy_, &DataClient::CheckAndMutateRow, request,
240  "Table::CheckAndMutateRow", status, is_idempotent);
241 
242  if (!status.ok()) {
243  return MakeStatusFromRpcError(status);
244  }
245  return response.predicate_matched() ? MutationBranch::kPredicateMatched
246  : MutationBranch::kPredicateNotMatched;
247 }
248 
249 future<StatusOr<MutationBranch>> Table::AsyncCheckAndMutateRow(
250  std::string row_key, Filter filter, std::vector<Mutation> true_mutations,
251  std::vector<Mutation> false_mutations, CompletionQueue& cq) {
252  btproto::CheckAndMutateRowRequest request;
253  request.set_row_key(std::move(row_key));
254  SetCommonTableOperationRequest<btproto::CheckAndMutateRowRequest>(
255  request, app_profile_id_, table_name_);
256  *request.mutable_predicate_filter() = std::move(filter).as_proto();
257  for (auto& m : true_mutations) {
258  *request.add_true_mutations() = std::move(m.op);
259  }
260  for (auto& m : false_mutations) {
261  *request.add_false_mutations() = std::move(m.op);
262  }
263  bool const is_idempotent =
264  idempotent_mutation_policy_->is_idempotent(request);
265 
266  auto client = client_;
267  auto metadata_update_policy = clone_metadata_update_policy();
268  return google::cloud::internal::StartRetryAsyncUnaryRpc(
269  cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
270  is_idempotent,
271  [client, metadata_update_policy](
272  grpc::ClientContext* context,
273  btproto::CheckAndMutateRowRequest const& request,
274  grpc::CompletionQueue* cq) {
275  metadata_update_policy.Setup(*context);
276  return client->AsyncCheckAndMutateRow(context, request, cq);
277  },
278  std::move(request))
279  .then([](future<StatusOr<btproto::CheckAndMutateRowResponse>> f)
280  -> StatusOr<MutationBranch> {
281  auto response = f.get();
282  if (!response) {
283  return response.status();
284  }
285  return response->predicate_matched()
286  ? MutationBranch::kPredicateMatched
287  : MutationBranch::kPredicateNotMatched;
288  });
289 }
290 
291 // Call the `google.bigtable.v2.Bigtable.SampleRowKeys` RPC until
292 // successful. When RPC is finished, this function returns the SampleRowKeys
293 // as a std::vector<>. If the RPC fails, it will keep retrying until the
294 // policies in effect tell us to stop. Note that each retry must clear the
295 // samples otherwise the result is an inconsistent set of samples row keys.
296 StatusOr<std::vector<bigtable::RowKeySample>> Table::SampleRows() {
297  // Copy the policies in effect for this operation.
298  auto backoff_policy = clone_rpc_backoff_policy();
299  auto retry_policy = clone_rpc_retry_policy();
300  std::vector<bigtable::RowKeySample> samples;
301 
302  // Build the RPC request for SampleRowKeys
303  btproto::SampleRowKeysRequest request;
304  btproto::SampleRowKeysResponse response;
305  SetCommonTableOperationRequest<btproto::SampleRowKeysRequest>(
306  request, app_profile_id_, table_name_);
307 
308  while (true) {
309  grpc::ClientContext client_context;
310  backoff_policy->Setup(client_context);
311  retry_policy->Setup(client_context);
312  clone_metadata_update_policy().Setup(client_context);
313 
314  auto stream = client_->SampleRowKeys(&client_context, request);
315  while (stream->Read(&response)) {
316  bigtable::RowKeySample row_sample;
317  row_sample.offset_bytes = response.offset_bytes();
318  row_sample.row_key = std::move(*response.mutable_row_key());
319  samples.emplace_back(std::move(row_sample));
320  }
321  auto status = stream->Finish();
322  if (status.ok()) {
323  break;
324  }
325  if (!retry_policy->OnFailure(status)) {
326  return MakeStatusFromRpcError(
327  status.error_code(),
328  "Retry policy exhausted: " + status.error_message());
329  }
330  samples.clear();
331  auto delay = backoff_policy->OnCompletion(status);
332  std::this_thread::sleep_for(delay);
333  }
334  return samples;
335 }
336 
337 StatusOr<Row> Table::ReadModifyWriteRowImpl(
338  btproto::ReadModifyWriteRowRequest request) {
339  SetCommonTableOperationRequest<
340  ::google::bigtable::v2::ReadModifyWriteRowRequest>(
341  request, app_profile_id_, table_name_);
342 
343  grpc::Status status;
344  auto response = ClientUtils::MakeNonIdemponentCall(
345  *(client_), clone_rpc_retry_policy(), clone_metadata_update_policy(),
346  &DataClient::ReadModifyWriteRow, request, "ReadModifyWriteRowRequest",
347  status);
348  if (!status.ok()) {
349  return MakeStatusFromRpcError(status);
350  }
351  return TransformReadModifyWriteRowResponse<
352  btproto::ReadModifyWriteRowResponse>(response);
353 }
354 
355 future<StatusOr<Row>> Table::AsyncReadModifyWriteRowImpl(
356  CompletionQueue& cq,
357  ::google::bigtable::v2::ReadModifyWriteRowRequest request) {
358  SetCommonTableOperationRequest<
359  ::google::bigtable::v2::ReadModifyWriteRowRequest>(
360  request, app_profile_id_, table_name_);
361 
362  auto client = client_;
363  auto metadata_update_policy = clone_metadata_update_policy();
364  return google::cloud::internal::StartRetryAsyncUnaryRpc(
365  cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
366  /*is_idempotent=*/false,
367  [client, metadata_update_policy](
368  grpc::ClientContext* context,
369  btproto::ReadModifyWriteRowRequest const& request,
370  grpc::CompletionQueue* cq) {
371  metadata_update_policy.Setup(*context);
372  return client->AsyncReadModifyWriteRow(context, request, cq);
373  },
374  std::move(request))
375  .then([](future<StatusOr<btproto::ReadModifyWriteRowResponse>> fut)
376  -> StatusOr<Row> {
377  auto result = fut.get();
378  if (!result) {
379  return result.status();
380  }
381  return TransformReadModifyWriteRowResponse<
382  btproto::ReadModifyWriteRowResponse>(*result);
383  });
384 }
385 
386 future<StatusOr<std::pair<bool, Row>>> Table::AsyncReadRow(CompletionQueue& cq,
387  std::string row_key,
388  Filter filter) {
389  class AsyncReadRowHandler {
390  public:
391  AsyncReadRowHandler() : row_("", {}) {}
392 
393  future<StatusOr<std::pair<bool, Row>>> GetFuture() {
394  return row_promise_.get_future();
395  }
396 
397  future<bool> OnRow(Row row) {
398  // assert(!row_received_);
399  row_ = std::move(row);
400  row_received_ = true;
401  // Don't satisfy the promise before `OnStreamFinished`.
402  //
403  // The `CompletionQueue`, which this object holds a reference to, should
404  // not be shut down before `OnStreamFinished` is called. In order to make
405  // sure of that, satisying the `promise<>` is deferred until then - the
406  // user shouldn't shutown the `CompleetionQue` before this whole
407  // operations is done.
408  return make_ready_future(false);
409  }
410 
411  void OnStreamFinished(Status status) {
412  if (row_received_) {
413  // If we got a row we don't need to care about the stream status.
414  row_promise_.set_value(std::make_pair(true, std::move(row_)));
415  return;
416  }
417  if (status.ok()) {
418  row_promise_.set_value(std::make_pair(false, Row("", {})));
419  } else {
420  row_promise_.set_value(std::move(status));
421  }
422  }
423 
424  private:
425  Row row_;
426  bool row_received_{};
427  promise<StatusOr<std::pair<bool, Row>>> row_promise_;
428  };
429 
430  RowSet row_set(std::move(row_key));
431  std::int64_t const rows_limit = 1;
432  auto handler = std::make_shared<AsyncReadRowHandler>();
433  AsyncReadRows(
434  cq, [handler](Row row) { return handler->OnRow(std::move(row)); },
435  [handler](Status status) {
436  handler->OnStreamFinished(std::move(status));
437  },
438  std::move(row_set), rows_limit, std::move(filter));
439  return handler->GetFuture();
440 }
441 
442 } // namespace BIGTABLE_CLIENT_NS
443 } // namespace bigtable
444 } // namespace cloud
445 } // namespace google
Object returned by Table::ReadRows(), enumerates rows in the response.
Definition: row_reader.h:44
Define the interfaces to create filter expressions.
Definition: filters.h:49
future< typename internal::make_ready_return< T >::type > make_ready_future(T &&t)
google::cloud::Status MakeStatusFromRpcError(grpc::Status const &status)
iterator end()
End iterator over the rows in the response.
Definition: row_reader.cc:104
Represent a set of mutations across multiple rows.
Definition: mutations.h:467
The in-memory representation of a Bigtable row.
Definition: row.h:34
Represent a (possibly non-continuous) set of row keys.
Definition: row_set.h:31
#define BIGTABLE_CLIENT_NS
Definition: version.h:22
iterator begin()
Input iterator over rows in the response.
Definition: row_reader.cc:98
void MoveTo(google::bigtable::v2::MutateRowsRequest::Entry *entry)
Move the contents into a bigtable::v2::MutateRowsRequest::Entry.
Definition: mutations.h:357
Represent a single row mutation.
Definition: mutations.h:293
bigtable::internal::UnaryClientUtils< InstanceAdminClient > ClientUtils
Shortcuts to avoid typing long names over and over.