Google Cloud Bigtable C++ Client  1.1.0
A C++ Client Library for Google Cloud Bigtable
mutation_batcher.cc
Go to the documentation of this file.
1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/bigtable/mutation_batcher.h"
16 #include "google/cloud/bigtable/internal/client_options_defaults.h"
17 #include "google/cloud/grpc_utils/grpc_error_delegate.h"
18 #include <sstream>
19 
20 namespace google {
21 namespace cloud {
22 namespace bigtable {
23 inline namespace BIGTABLE_CLIENT_NS {
25  : // Cloud Bigtable doesn't accept more than this.
27  // Let's make the default slightly smaller, so that overheads or
28  // miscalculations don't tip us over.
29  max_size_per_batch(BIGTABLE_CLIENT_DEFAULT_MAX_MESSAGE_LENGTH * 9LL / 10),
30  max_batches(8),
31  max_outstanding_size(BIGTABLE_CLIENT_DEFAULT_MAX_MESSAGE_LENGTH * 6) {}
32 
34  CompletionQueue& cq, SingleRowMutation mut) {
35  AdmissionPromise admission_promise;
36  CompletionPromise completion_promise;
37  auto res = std::make_pair(admission_promise.get_future(),
38  completion_promise.get_future());
39  PendingSingleRowMutation pending(std::move(mut),
40  std::move(completion_promise),
41  std::move(admission_promise));
42  std::unique_lock<std::mutex> lk(mu_);
43 
44  grpc::Status mutation_status = IsValid(pending);
45  if (!mutation_status.ok()) {
46  lk.unlock();
47  // Destroy the mutation before satisfying the admission promise so that we
48  // can limit the memory usage.
49  pending.mut.Clear();
50  pending.completion_promise.set_value(
51  grpc_utils::MakeStatusFromRpcError(mutation_status));
52  // No need to consider no_more_pending_promises because this operation
53  // didn't lower the number of pending operations.
54  pending.admission_promise.set_value();
55  return res;
56  }
57  ++num_requests_pending_;
58 
59  if (!CanAppendToBatch(pending)) {
60  pending_mutations_.push(std::move(pending));
61  return res;
62  }
63  std::vector<AdmissionPromise> admission_promises_to_satisfy;
64  admission_promises_to_satisfy.emplace_back(
65  std::move(pending.admission_promise));
66  Admit(std::move(pending));
67  FlushIfPossible(cq);
68  SatisfyPromises(std::move(admission_promises_to_satisfy), lk);
69  return res;
70 }
71 
73  std::unique_lock<std::mutex> lk(mu_);
74  if (num_requests_pending_ == 0) {
75  return make_ready_future();
76  }
77  no_more_pending_promises_.emplace_back();
78  return no_more_pending_promises_.back().get_future();
79 }
80 
81 MutationBatcher::PendingSingleRowMutation::PendingSingleRowMutation(
82  SingleRowMutation mut_arg, CompletionPromise completion_promise,
83  AdmissionPromise admission_promise)
84  : mut(std::move(mut_arg)),
85  completion_promise(std::move(completion_promise)),
86  admission_promise(std::move(admission_promise)) {
87  ::google::bigtable::v2::MutateRowsRequest::Entry tmp;
88  mut.MoveTo(&tmp);
89  // This operation might not be cheap, so let's cache it.
90  request_size = tmp.ByteSizeLong();
91  num_mutations = static_cast<std::size_t>(tmp.mutations_size());
92  mut = SingleRowMutation(std::move(tmp));
93 }
94 
95 grpc::Status MutationBatcher::IsValid(PendingSingleRowMutation& mut) const {
96  // Objects of this class need to be aware of the maximum allowed number of
97  // mutations in a batch because it should not pack more. If we have this
98  // knowledge, we might as well simplify everything and not admit larger
99  // mutations.
100  if (mut.num_mutations > options_.max_mutations_per_batch) {
101  std::stringstream stream;
102  stream << "Too many (" << mut.num_mutations
103  << ") mutations in a SingleRowMutations request. "
104  << options_.max_mutations_per_batch << " is the limit.";
105  return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, stream.str());
106  }
107  if (mut.num_mutations == 0) {
108  return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
109  "Supplied SingleRowMutations has no entries");
110  }
111  if (mut.request_size > options_.max_size_per_batch) {
112  std::stringstream stream;
113  stream << "Too large (" << mut.request_size
114  << " bytes) mutation in a SingleRowMutations request. "
115  << options_.max_size_per_batch << " bytes is the limit.";
116  return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, stream.str());
117  }
118  return grpc::Status();
119 }
120 
121 bool MutationBatcher::HasSpaceFor(PendingSingleRowMutation const& mut) const {
122  return outstanding_size_ + mut.request_size <=
123  options_.max_outstanding_size &&
124  cur_batch_->requests_size + mut.request_size <=
125  options_.max_size_per_batch &&
126  cur_batch_->num_mutations + mut.num_mutations <=
127  options_.max_mutations_per_batch;
128 }
129 
130 bool MutationBatcher::FlushIfPossible(CompletionQueue cq) {
131  if (cur_batch_->num_mutations > 0 &&
132  num_outstanding_batches_ < options_.max_batches) {
133  ++num_outstanding_batches_;
134 
135  auto batch = std::make_shared<Batch>();
136  cur_batch_.swap(batch);
137  table_.AsyncBulkApply(std::move(batch->requests), cq)
138  .then([this, cq,
139  batch](future<std::vector<FailedMutation>> failed) mutable {
140  OnBulkApplyDone(std::move(cq), std::move(*batch), failed.get());
141  });
142  return true;
143  }
144  return false;
145 }
146 
147 void MutationBatcher::OnBulkApplyDone(CompletionQueue cq,
148  MutationBatcher::Batch batch,
149  std::vector<FailedMutation> failed) {
150  // First process all the failures, marking the mutations as done after
151  // processing them.
152  for (auto const& f : failed) {
153  int const idx = f.original_index();
154  if (idx < 0 ||
155  static_cast<std::size_t>(idx) >= batch.mutation_data.size()) {
156  // This is a bug on the server or the client, either terminate (when
157  // -fno-exceptions is set) or throw an exception.
158  std::ostringstream os;
159  os << "Index " << idx << " is out of range [0,"
160  << batch.mutation_data.size() << ")";
161  google::cloud::internal::ThrowRuntimeError(std::move(os).str());
162  }
163  MutationData& data = batch.mutation_data[idx];
164  data.completion_promise.set_value(f.status());
165  data.done = true;
166  }
167  // Any remaining mutations are treated as successful.
168  for (auto& data : batch.mutation_data) {
169  if (!data.done) {
170  data.completion_promise.set_value(Status());
171  data.done = true;
172  }
173  }
174  auto const num_mutations = batch.mutation_data.size();
175  batch.mutation_data.clear();
176 
177  std::unique_lock<std::mutex> lk(mu_);
178  outstanding_size_ -= batch.requests_size;
179  num_requests_pending_ -= num_mutations;
180  num_outstanding_batches_--;
181  SatisfyPromises(TryAdmit(cq), lk); // unlocks the lock
182 }
183 
184 std::vector<MutationBatcher::AdmissionPromise> MutationBatcher::TryAdmit(
185  CompletionQueue& cq) {
186  // Defer satisfying promises until we release the lock.
187  std::vector<AdmissionPromise> admission_promises;
188 
189  do {
190  while (!pending_mutations_.empty() &&
191  HasSpaceFor(pending_mutations_.front())) {
192  auto& mut = pending_mutations_.front();
193  admission_promises.emplace_back(std::move(mut.admission_promise));
194  Admit(std::move(mut));
195  pending_mutations_.pop();
196  }
197  } while (FlushIfPossible(cq));
198  return admission_promises;
199 }
200 
201 void MutationBatcher::Admit(PendingSingleRowMutation mut) {
202  outstanding_size_ += mut.request_size;
203  cur_batch_->requests_size += mut.request_size;
204  cur_batch_->num_mutations += mut.num_mutations;
205  cur_batch_->requests.emplace_back(std::move(mut.mut));
206  cur_batch_->mutation_data.emplace_back(MutationData(std::move(mut)));
207 }
208 
209 void MutationBatcher::SatisfyPromises(
210  std::vector<AdmissionPromise> admission_promises,
211  std::unique_lock<std::mutex>& lk) {
212  std::vector<NoMorePendingPromise> no_more_pending_promises;
213  if (num_requests_pending_ == 0 && num_outstanding_batches_ == 0) {
214  // We should wait not only on num_requests_pending_ being zero but also on
215  // num_outstanding_batches_ because we want to allow the user to kill the
216  // completion queue after this promise is fulfilled. Otherwise, the user can
217  // destroy the completion queue while the last batch is still being
218  // processed - we've had this bug (#2140).
219  no_more_pending_promises_.swap(no_more_pending_promises);
220  }
221  lk.unlock();
222 
223  // Inform the user that we've admitted these mutations and there might be some
224  // space in the buffer finally.
225  for (auto& promise : admission_promises) {
226  promise.set_value();
227  }
228  for (auto& promise : no_more_pending_promises) {
229  promise.set_value();
230  }
231 }
232 
233 } // namespace BIGTABLE_CLIENT_NS
234 } // namespace bigtable
235 } // namespace cloud
236 } // namespace google
std::pair< future< void >, future< Status > > AsyncApply(CompletionQueue &cq, SingleRowMutation mut)
Asynchronously apply mutation.
future< void > make_ready_future()
future< void > AsyncWaitForNoPendingRequests()
Asynchronously wait until all submitted mutations complete.
#define BIGTABLE_CLIENT_NS
Definition: version.h:22
Objects of this class pack single row mutations into bulk mutations.
Contains all the Cloud Bigtable C++ client APIs.