Google Cloud Bigtable C++ Client  1.1.0
A C++ Client Library for Google Cloud Bigtable
table.cc
Go to the documentation of this file.
1 // Copyright 2017 Google Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/bigtable/table.h"
16 #include "google/cloud/bigtable/internal/async_bulk_apply.h"
17 #include "google/cloud/bigtable/internal/async_retry_unary_rpc.h"
18 #include "google/cloud/bigtable/internal/bulk_mutator.h"
19 #include "google/cloud/bigtable/internal/unary_client_utils.h"
20 #include "google/cloud/grpc_utils/grpc_error_delegate.h"
21 #include <thread>
22 #include <type_traits>
23 
24 namespace btproto = ::google::bigtable::v2;
25 namespace google {
26 namespace cloud {
27 namespace bigtable {
28 inline namespace BIGTABLE_CLIENT_NS {
29 namespace {
30 template <typename Request>
31 void SetCommonTableOperationRequest(Request& request,
32  std::string const& app_profile_id,
33  std::string const& table_name) {
34  request.set_app_profile_id(app_profile_id);
35  request.set_table_name(table_name);
36 }
37 
38 template <typename Response>
39 Row TransformReadModifyWriteRowResponse(Response& response) {
40  std::vector<bigtable::Cell> cells;
41  auto& row = *response.mutable_row();
42  for (auto& family : *row.mutable_families()) {
43  for (auto& column : *family.mutable_columns()) {
44  for (auto& cell : *column.mutable_cells()) {
45  std::vector<std::string> labels;
46  std::move(cell.mutable_labels()->begin(), cell.mutable_labels()->end(),
47  std::back_inserter(labels));
48  bigtable::Cell new_cell(row.key(), family.name(), column.qualifier(),
49  cell.timestamp_micros(),
50  std::move(*cell.mutable_value()),
51  std::move(labels));
52 
53  cells.emplace_back(std::move(new_cell));
54  }
55  }
56  }
57 
58  return Row(std::move(*row.mutable_key()), std::move(cells));
59 }
60 
61 } // namespace
62 
64 
65 static_assert(std::is_copy_assignable<bigtable::Table>::value,
66  "bigtable::Table must be CopyAssignable");
67 
69  // Copy the policies in effect for this operation. Many policy classes change
70  // their state as the operation makes progress (or fails to make progress), so
71  // we need fresh instances.
75 
76  // Build the RPC request, try to minimize copying.
81 
82  bool const is_idempotent =
84  [&idempotent_policy](btproto::Mutation const& m) {
86  });
87 
90  while (true) {
96 
97  if (status.ok()) {
98  return google::cloud::Status{};
99  }
100  // It is up to the policy to terminate this loop, it could run
101  // forever, but that would be a bad policy (pun intended).
104  status.error_code(),
105  "Permanent (or too many transient) errors in Table::Apply()");
106  }
109  }
110 }
111 
116  mut.MoveTo(request);
118 
119  // Determine if all the mutations are idempotent. The idempotency of the
120  // mutations won't change as the retry loop executes, so we can just compute
121  // it once and use a constant value for the loop.
123  bool const is_idempotent = std::all_of(
127  });
128 
129  auto client = client_;
136  grpc::CompletionQueue* cq) {
138  },
139  std::move(request), cq)
141  return r.get().status();
142  });
143 }
144 
146  grpc::Status status;
147 
148  // Copy the policies in effect for this operation. Many policy classes change
149  // their state as the operation makes progress (or fails to make progress), so
150  // we need fresh instances.
154 
157  while (mutator.HasPendingMutations()) {
163  if (!status.ok() && !retry_policy->OnFailure(status)) {
164  break;
165  }
168  }
169  return std::move(mutator).OnRetryDone();
170 }
171 
173  CompletionQueue& cq) {
179 }
180 
188 }
189 
191  Filter filter) {
197 }
198 
200  Filter filter) {
202  std::int64_t const rows_limit = 1;
203  RowReader reader =
205 
206  auto it = reader.begin();
207  if (it == reader.end()) {
208  return std::make_pair(false, Row("", {}));
209  }
210  if (!*it) {
211  return it->status();
212  }
213  auto result = std::make_pair(true, std::move(**it));
214  if (++it != reader.end()) {
215  return Status(StatusCode::kInternal,
216  "internal error - RowReader returned 2 rows in ReadRow()");
217  }
218  return result;
219 }
220 
224  grpc::Status status;
230  for (auto& m : true_mutations) {
232  }
233  for (auto& m : false_mutations) {
235  }
236  bool const is_idempotent =
238  auto response = ClientUtils::MakeCall(
241  "Table::CheckAndMutateRow", status, is_idempotent);
242 
243  if (!status.ok()) {
245  }
248 }
249 
258  for (auto& m : true_mutations) {
260  }
261  for (auto& m : false_mutations) {
263  }
264  bool const is_idempotent =
266 
267  auto client = client_;
274  grpc::CompletionQueue* cq) {
276  },
277  std::move(request), cq)
280  auto response = f.get();
281  if (!response) {
282  return response.status();
283  }
284  return response->predicate_matched()
287  });
288 }
289 
290 // Call the `google.bigtable.v2.Bigtable.SampleRowKeys` RPC until
291 // successful. When RPC is finished, this function returns the SampleRowKeys
292 // as a std::vector<>. If the RPC fails, it will keep retrying until the
293 // policies in effect tell us to stop. Note that each retry must clear the
294 // samples otherwise the result is an inconsistent set of samples row keys.
296  // Copy the policies in effect for this operation.
300 
301  // Build the RPC request for SampleRowKeys
306 
307  while (true) {
312 
314  while (stream->Read(&response)) {
319  }
320  auto status = stream->Finish();
321  if (status.ok()) {
322  break;
323  }
324  if (!retry_policy->OnFailure(status)) {
326  status.error_code(),
327  "Retry policy exhausted: " + status.error_message());
328  }
329  samples.clear();
332  }
333  return samples;
334 }
335 
341 
342  grpc::Status status;
345  &DataClient::ReadModifyWriteRow, request, "ReadModifyWriteRowRequest",
346  status);
347  if (!status.ok()) {
349  }
352 }
353 
360 
361  auto client = client_;
368  grpc::CompletionQueue* cq) {
370  },
371  std::move(request), cq)
373  -> StatusOr<Row> {
374  auto result = fut.get();
375  if (!result) {
376  return result.status();
377  }
380  });
381 }
382 
384  std::string row_key,
385  Filter filter) {
386  class AsyncReadRowHandler {
387  public:
388  AsyncReadRowHandler() : row_("", {}) {}
389 
390  future<StatusOr<std::pair<bool, Row>>> GetFuture() {
391  return row_promise_.get_future();
392  }
393 
394  future<bool> OnRow(Row row) {
395  // assert(!row_received_);
396  row_ = std::move(row);
397  row_received_ = true;
398  // Don't satisfy the promise before `OnStreamFinished`.
399  //
400  // The `CompletionQueue`, which this object holds a reference to, should
401  // not be shut down before `OnStreamFinished` is called. In order to make
402  // sure of that, satisying the `promise<>` is deferred until then - the
403  // user shouldn't shutown the `CompleetionQue` before this whole
404  // operations is done.
405  return make_ready_future(false);
406  }
407 
409  if (row_received_) {
410  // If we got a row we don't need to care about the stream status.
412  return;
413  }
414  if (status.ok()) {
415  row_promise_.set_value(std::make_pair(false, Row("", {})));
416  } else {
418  }
419  }
420 
421  private:
422  Row row_;
423  bool row_received_{};
425  };
426 
428  std::int64_t const rows_limit = 1;
431  [handler](Row row) { return handler->OnRow(std::move(row)); },
432  [handler](Status status) {
434  },
436  return handler->GetFuture();
437 }
438 
439 } // namespace BIGTABLE_CLIENT_NS
440 } // namespace bigtable
441 } // namespace cloud
442 } // namespace google
#define BIGTABLE_CLIENT_NS
Definition: version.h:22
bigtable::internal::UnaryClientUtils< InstanceAdminClient > ClientUtils
Shortcuts to avoid typing long names over and over.