Google Cloud Bigtable C++ Client  1.1.0
A C++ Client Library for Google Cloud Bigtable
mutation_batcher.h
Go to the documentation of this file.
1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_MUTATION_BATCHER_H_
16 #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_MUTATION_BATCHER_H_
17 
18 #include "google/cloud/bigtable/client_options.h"
19 #include "google/cloud/bigtable/completion_queue.h"
20 #include "google/cloud/bigtable/mutations.h"
21 #include "google/cloud/bigtable/table.h"
22 #include "google/cloud/bigtable/version.h"
23 #include "google/cloud/internal/make_unique.h"
24 #include "google/cloud/status.h"
25 #include <google/bigtable/v2/bigtable.grpc.pb.h>
26 #include <deque>
27 #include <functional>
28 #include <memory>
29 #include <queue>
30 
31 namespace google {
32 namespace cloud {
33 namespace bigtable {
34 inline namespace BIGTABLE_CLIENT_NS {
35 /**
36  * Objects of this class pack single row mutations into bulk mutations.
37  *
38  * In order to maximize throughput when applying a lot of mutations to Cloud
39  * Bigtable, one should pack the mutations in `BulkMutations`. This class helps
40  * in doing so. Create a `MutationBatcher` and use
41  * `MutationBatcher::AsyncApply()` to apply a large stream of mutations to the
42  * same `Table`. Objects of this class will efficiently create batches of
43  * `SingleRowMutations` and maintain multiple batches "in flight".
44  *
45  * This class also offers an easy-to-use flow control mechanism to avoid
46  * unbounded growth in its internal buffers.
47  *
48  * Applications must provide a `CompletionQueue` to (asynchronously) execute
49  * these operations. The application is responsible of executing the
50  * `CompletionQueue` event loop in one or more threads.
51  */
53  public:
54  /// Configuration for `MutationBatcher`.
55  struct Options {
56  Options();
57 
58  /// A single RPC will not have more mutations than this.
59  Options& SetMaxMutationsPerBatch(size_t max_mutations_per_batch_arg) {
60  max_mutations_per_batch = max_mutations_per_batch_arg;
61  return *this;
62  }
63 
64  /// Sum of mutations' sizes in a single RPC will not be larger than this.
65  Options& SetMaxSizePerBatch(size_t max_size_per_batch_arg) {
66  max_size_per_batch = max_size_per_batch_arg;
67  return *this;
68  }
69 
70  /// There will be no more RPCs outstanding (except for retries) than this.
71  Options& SetMaxBatches(size_t max_batches_arg) {
72  max_batches = max_batches_arg;
73  return *this;
74  }
75 
76  /// MutationBatcher will at most admit mutations of this total size.
77  Options& SetMaxOutstandingSize(size_t max_outstanding_size_arg) {
78  max_outstanding_size = max_outstanding_size_arg;
79  return *this;
80  }
81 
84  size_t max_batches;
86  };
87 
88  explicit MutationBatcher(Table table, Options options = Options())
89  : table_(std::move(table)),
90  options_(options),
91  num_outstanding_batches_(),
92  outstanding_size_(),
93  num_requests_pending_(),
95 
96  /**
97  * Asynchronously apply mutation.
98  *
99  * The mutation will most likely be batched together with others to optimize
100  * for throughput. As a result, latency is likely to be worse than
101  * `Table::AsyncApply`.
102  *
103  * @param mut the mutation. Note that this function takes ownership
104  * (and then discards) the data in the mutation. In general, a
105  * `SingleRowMutation` can be used to modify and/or delete
106  * multiple cells, across different columns and column families.
107  * @param cq the completion queue that will execute the asynchronous
108  * calls, the application must ensure that one or more threads are
109  * blocked on `cq.Run()`.
110  *
111  * @return *admission* and *completion* futures
112  *
113  * The *completion* future will report the mutation's status once it
114  * completes.
115  *
116  * The *admission* future should be used for flow control. In order to bound
117  * the memory usage used by `MutationBatcher`, one should not submit more
118  * mutations before the *admission* future is satisfied. Note that while the
119  * future is often already satisfied when the function returns, applications
120  * should not assume that this is always the case.
121  *
122  * One should not make assumptions on which future will be satisfied first.
123  *
124  * This quasi-synchronous example shows the intended use:
125  * @code
126  * bigtable::MutationBatcher batcher(bigtable::Table(...args...));
127  * bigtable::CompletionQueue cq;
128  * std::thread cq_runner([]() { cq.Run(); });
129  *
130  * while (HasMoreMutations()) {
131  * auto admission_completion = batcher.AsyncApply(cq, GenerateMutation());
132  * auto& admission_future = admission_completion.first;
133  * auto& completion_future = admission_completion.second;
134  * completion_future.then([](future<Status> completion_status) {
135  * // handle mutation completion asynchronously
136  * });
137  * // Potentially slow down submission not to make buffers in
138  * // MutationBatcher grow unbounded.
139  * admission_future.get();
140  * }
141  * // Wait for all mutations to complete
142  * batcher.AsyncWaitForNoPendingRequests().get();
143  * cq.Shutdown();
144  * cq_runner.join();
145  * @endcode
146  */
147  std::pair<future<void>, future<Status>> AsyncApply(CompletionQueue& cq,
148  SingleRowMutation mut);
149 
150  /**
151  * Asynchronously wait until all submitted mutations complete.
152  *
153  * @return a future which will be satisfied once all mutations submitted
154  * before calling this function finish; if there are no such operations,
155  * the returned future is already satisfied.
156  */
158 
159  private:
160  using CompletionPromise = promise<Status>;
161  using AdmissionPromise = promise<void>;
162  using NoMorePendingPromise = promise<void>;
163  struct Batch;
164 
165  /**
166  * This structure represents a single mutation before it is admitted.
167  */
168  struct PendingSingleRowMutation {
169  PendingSingleRowMutation(SingleRowMutation mut_arg,
170  CompletionPromise completion_promise,
171  AdmissionPromise admission_promise);
172 
173  SingleRowMutation mut;
174  size_t num_mutations;
175  size_t request_size;
176  CompletionPromise completion_promise;
177  AdmissionPromise admission_promise;
178  };
179 
180  /**
181  * A mutation that has been sent to the Cloud Bigtable service.
182  *
183  * We need to save the `CompletionPromise` associated with each mutation.
184  * Because only failures are reported, we need to track whether the mutation
185  * is "done", so we can simulate a success report.
186  */
187  struct MutationData {
188  explicit MutationData(PendingSingleRowMutation pending)
189  : completion_promise(std::move(pending.completion_promise)),
190  done(false) {}
191  CompletionPromise completion_promise;
192  bool done;
193  };
194 
195  /**
196  * This class represents a single batch of mutations sent in one RPC.
197  *
198  * Objects of this class hold the accumulated mutations, their completion
199  * promises and basic statistics.
200  *
201  * Objects of this class don't need separate synchronization.
202  * There are 2 important stages of these objects' lifecycle: when mutations
203  * are accumulated and when the batch is worked on by `AsyncBulkApply`. In the
204  * first stage, `MutationBatcher`'s synchronization ensures that its data is
205  * not accessed from multiple threads. In the second stage we rely on the fact
206  * that `AsyncBulkApply` invokes the callbacks serially. This in turn
207  * relies on the fact that `CompletionQueue` invokes callbacks from a
208  * streaming response in sequence and that `AsyncRetryOp` doesn't schedule
209  * another attempt before invoking callbacks for the previous one.
210  */
211  struct Batch {
212  Batch() : num_mutations(), requests_size() {}
213 
214  size_t num_mutations;
215  size_t requests_size;
216  BulkMutation requests;
217  std::vector<MutationData> mutation_data;
218  };
219 
220  /// Check if a mutation doesn't exceed allowed limits.
221  grpc::Status IsValid(PendingSingleRowMutation& mut) const;
222 
223  /**
224  * Check whether there is space for the passed mutation in the currently
225  * constructed batch.
226  */
227  bool HasSpaceFor(PendingSingleRowMutation const& mut) const;
228 
229  /**
230  * Check if one can append a mutation to the currently constructed batch.
231  * Even if there is space for the mutation, we shouldn't append mutations if
232  * some other are not admitted yet.
233  */
234  bool CanAppendToBatch(PendingSingleRowMutation const& mut) const {
235  // If some mutations are already subject to flow control, don't admit any
236  // new, even if there's space for them. Otherwise we might starve big
237  // mutations.
238  return pending_mutations_.empty() && HasSpaceFor(mut);
239  }
240 
241  /**
242  * Send the currently constructed batch if there are not too many outstanding
243  * already. If there are no mutations in the batch, it's a noop.
244  */
245  bool FlushIfPossible(CompletionQueue cq);
246 
247  /// Handle a completed batch.
248  void OnBulkApplyDone(CompletionQueue cq, MutationBatcher::Batch batch,
249  std::vector<FailedMutation> failed);
250 
251  /**
252  * Try to move mutations waiting in `pending_mutations_` to the currently
253  * constructed batch.
254  *
255  * @return the admission promises of the newly admitted mutations.
256  */
257  std::vector<MutationBatcher::AdmissionPromise> TryAdmit(CompletionQueue& cq);
258 
259  /**
260  * Append mutation `mut` to the currently constructed batch.
261  */
262  void Admit(PendingSingleRowMutation mut);
263 
264  /**
265  * Satisfies passed admission promises and potentially the promises of no more
266  * pending requests. Unlocks `lk`.
267  */
268  void SatisfyPromises(std::vector<AdmissionPromise>,
269  std::unique_lock<std::mutex>& lk);
270 
271  std::mutex mu_;
272  Table table_;
273  Options options_;
274 
275  /// Num batches sent but not completed.
276  size_t num_outstanding_batches_;
277  /// Size of admitted but uncompleted mutations.
278  size_t outstanding_size_;
279  // Number of uncompleted SingleRowMutations (including not admitted).
280  size_t num_requests_pending_;
281 
282  /// Currently contructed batch of mutations.
283  std::shared_ptr<Batch> cur_batch_;
284 
285  /**
286  * These are the mutations which have not been admitted yet. If the user is
287  * properly reacting to `admission_promise`s, there should be very few of
288  * these (likely no more than one).
289  */
290  std::queue<PendingSingleRowMutation> pending_mutations_;
291 
292  /**
293  * The list of promises made to this point.
294  *
295  * These promises are satisfied as part of calling
296  * `AsyncWaitForNoPendingRequests()`.
297  */
298  std::vector<NoMorePendingPromise> no_more_pending_promises_;
299 };
300 
301 } // namespace BIGTABLE_CLIENT_NS
302 } // namespace bigtable
303 } // namespace cloud
304 } // namespace google
305 
306 #endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_MUTATION_BATCHER_H_
std::pair< future< void >, future< Status > > AsyncApply(CompletionQueue &cq, SingleRowMutation mut)
Asynchronously apply mutation.
Options & SetMaxSizePerBatch(size_t max_size_per_batch_arg)
Sum of mutations&#39; sizes in a single RPC will not be larger than this.
Options & SetMaxMutationsPerBatch(size_t max_mutations_per_batch_arg)
A single RPC will not have more mutations than this.
Options & SetMaxOutstandingSize(size_t max_outstanding_size_arg)
MutationBatcher will at most admit mutations of this total size.
future< void > AsyncWaitForNoPendingRequests()
Asynchronously wait until all submitted mutations complete.
Options & SetMaxBatches(size_t max_batches_arg)
There will be no more RPCs outstanding (except for retries) than this.
#define BIGTABLE_CLIENT_NS
Definition: version.h:22
Objects of this class pack single row mutations into bulk mutations.
MutationBatcher(Table table, Options options=Options())
Contains all the Cloud Bigtable C++ client APIs.