-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a batch write flow control example for Bigtable #9314
base: main
Are you sure you want to change the base?
Add a batch write flow control example for Bigtable #9314
Conversation
411e662
to
6e4f7db
Compare
...am/batch-write-flow-control-example/src/main/java/bigtable/BatchWriteFlowControlExample.java
Show resolved
Hide resolved
6e4f7db
to
d563567
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello,
Please address the following questions:
- We ask to have a single code sample per file. This code looks like showing two code samples. What is this code sample demonstrates?
- This code sample does not have regional tags. What documentation use it?
- We do not host code samples without tests. What is a reason for lack of tests?
Pipeline p = Pipeline.create(options); | ||
|
||
PCollection<Long> numbers = p.apply(generateLabel, GenerateSequence.from(0).to(numRows)); | ||
|
||
if (options.getUseCloudBigtableIo()) { | ||
System.out.println("Using CloudBigtableIO"); | ||
PCollection<org.apache.hadoop.hbase.client.Mutation> mutations = numbers.apply(mutationLabel, | ||
ParDo.of(new CreateHbaseMutationFn(options.getBigtableColsPerRow(), | ||
options.getBigtableBytesPerCol()))); | ||
|
||
mutations.apply( | ||
String.format("Write data to table %s via CloudBigtableIO", options.getBigtableTableId()), | ||
CloudBigtableIO.writeToTable(new CloudBigtableTableConfiguration.Builder() | ||
.withProjectId(options.getProject()) | ||
.withInstanceId(options.getBigtableInstanceId()) | ||
.withTableId(options.getBigtableTableId()) | ||
.withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL, | ||
"true") | ||
.withConfiguration(BigtableOptionsFactory.BIGTABLE_BULK_MAX_REQUEST_SIZE_BYTES, | ||
"1048576") | ||
.build())); | ||
} else { | ||
System.out.println("Using BigtableIO"); | ||
PCollection<KV<ByteString, Iterable<Mutation>>> | ||
mutations = numbers.apply(mutationLabel, | ||
ParDo.of(new CreateMutationFn(options.getBigtableColsPerRow(), | ||
options.getBigtableBytesPerCol()))); | ||
|
||
BigtableIO.Write write = BigtableIO.write() | ||
.withProjectId(options.getProject()) | ||
.withInstanceId(options.getBigtableInstanceId()) | ||
.withTableId(options.getBigtableTableId()) | ||
.withFlowControl(true); // This enables batch write flow control | ||
|
||
mutations.apply( | ||
String.format("Write data to table %s via BigtableIO", options.getBigtableTableId()), | ||
write | ||
); | ||
} | ||
|
||
p.run(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this block of code is hard to read. Since it is a code sample it should be easy to understand. Please, reformat it so it will look like steps each of the steps calling apply
method of the pipeline. See the dataflow-bigquery-read-tablerows sample as a reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code sample shows how to enable the flow control feature. It has 2 parts because we have two different connectors and they're configured differently. I was unsure whether it'll worth duplicating the rest of the code to keep the sample simple. Please advice and I can split.
We're going to write the doc that will use this code sample.
Yes I'll add tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was unsure whether it'll worth duplicating the rest of the code to keep the sample simple. Please advice and I can split.
It seems to me that both parts of the if
/else
block apply only two transformations where the first transformations are very similar and hard to distinguish in the current version.
I suggest either to look into an option to abstract these transformations (by inheriting from a proper Beam class) or, at least, to implement each of them in separate method with meaningful names.
Another option can be to implement each pipeline in a separate class from the start to the end.
We're going to write the doc that will use this code sample.
Please either acquire a region tag or create a new tag using devrel/ site and add it to the code.
Pipeline p = Pipeline.create(options); | ||
|
||
PCollection<Long> numbers = p.apply(generateLabel, GenerateSequence.from(0).to(numRows)); | ||
|
||
if (options.getUseCloudBigtableIo()) { | ||
System.out.println("Using CloudBigtableIO"); | ||
PCollection<org.apache.hadoop.hbase.client.Mutation> mutations = numbers.apply(mutationLabel, | ||
ParDo.of(new CreateHbaseMutationFn(options.getBigtableColsPerRow(), | ||
options.getBigtableBytesPerCol()))); | ||
|
||
mutations.apply( | ||
String.format("Write data to table %s via CloudBigtableIO", options.getBigtableTableId()), | ||
CloudBigtableIO.writeToTable(new CloudBigtableTableConfiguration.Builder() | ||
.withProjectId(options.getProject()) | ||
.withInstanceId(options.getBigtableInstanceId()) | ||
.withTableId(options.getBigtableTableId()) | ||
.withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL, | ||
"true") | ||
.withConfiguration(BigtableOptionsFactory.BIGTABLE_BULK_MAX_REQUEST_SIZE_BYTES, | ||
"1048576") | ||
.build())); | ||
} else { | ||
System.out.println("Using BigtableIO"); | ||
PCollection<KV<ByteString, Iterable<Mutation>>> | ||
mutations = numbers.apply(mutationLabel, | ||
ParDo.of(new CreateMutationFn(options.getBigtableColsPerRow(), | ||
options.getBigtableBytesPerCol()))); | ||
|
||
BigtableIO.Write write = BigtableIO.write() | ||
.withProjectId(options.getProject()) | ||
.withInstanceId(options.getBigtableInstanceId()) | ||
.withTableId(options.getBigtableTableId()) | ||
.withFlowControl(true); // This enables batch write flow control | ||
|
||
mutations.apply( | ||
String.format("Write data to table %s via BigtableIO", options.getBigtableTableId()), | ||
write | ||
); | ||
} | ||
|
||
p.run(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it intend for run asynchronously? Please, append waitUntilFinish()
call to the result of the run()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far our example jobs all run async, is it a better practice to run sync? I'm happy to learn the reasoning of the practice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a code sample. It is a good practice to have the binary terminate after the sampled behavior is complete.
If there is practical differences between implementing the batch flow control asynchronously and synchronously, consider creating multiple code samples that demonstrate the behaviors.
Includes using BigtableIO and CloudBigtableIO
d563567
to
b29139e
Compare
Description
Add a batch write flow control example for Bigtable
Checklist
pom.xml
parent set to latestshared-configuration
mvn clean verify
requiredmvn -P lint checkstyle:check
requiredmvn -P lint clean compile pmd:cpd-check spotbugs:check
advisory only