Tag: apache beam

Apache Beam – Bounding Data to Big Query

We have what seems like a straight forward task.

Take data from Google Pub/Sub and push it into Big Query using Apache Beam.

However we hit a few issues and here’s my high level breakdown of what we discovered.

Google Cloud Dataflow becomes open source

To connect Pub/Sub to Big Query originally you would use Google Cloud Dataflow, however in early 2016 this became open source. What does this mean? Old documentation on Googles sites and new documentation on Apaches sites.

When searching for answers you seem to hit a maze of old Google pages, various Apache Beam versions (with what appear to be breaking changes in some cases), and very few useful blog posts.

At the time of writing the latest documentation is here however always confirm you’re looking at the latest:-

https://beam.apache.org/documentation/sdks/javadoc/2.4.0/overview-summary.html

My kingdom for a usage!

The Apache documentation has a fair bit of information in it however it does not have any usages.  So figuring out how to actually use some of the classes is a nightmare. I found odd snippets of code provided me answers more often than I’d like to admit.

Hopefully going forward there is some more usage examples in the Beam documentation.

Where are all the “Experts”

When you go to stack overflow there is a lot of questions with no comments and no answers. It’s concerning as it shows that Apache Beam hasn’t been widely adopted by the general community yet.

Bounding

So the crux of our issue was this, We needed bound data.

Bound data in Beam terms is data that is essentially chunked to known sizes. We discovered pretty quickly that writing from a file our data would push to Big Query immediately which is great while reading from Pub/Sub would stream the data.

The reason being (obviously in hindsight!) that the data is coming from a Subscription! So its pushing data when it likes and is not Bounded.

The Apache documentation talks a lot about various ways to make Unbound data bounded such as setting max amount of records, set max read time, or just set the IsBounded flag on the PCollection.

None of these worked for us.

Why Bounded?

Streaming data in Big Query comes at a cost. Pushing Bound data is free depending on the amount of pushes you are doing a day.

The Solution!

To get Unbound subscription data to be bound and sent to Big Query we ended up using the following code:-


p.begin()
.apply("Input", PubsubIO.readAvros(MyDataObject.class).fromTopic("topicname"))
.apply("Transform", ParDo.of(new TransformData()))
.apply("Write", BigQueryIO.writeTableRows()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(org.joda.time.Duration.standardSeconds(2))
.withNumFileShards(1)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withSchema(schema)
.to(table));
p.run().waitUntilFinish();

NOTE:- withNumFileShards is MANDATORY for withTriggeringFrequency however this isn’t documented anywhere. If you don’t include it you get an IllegalArgumentException with a message of “null”.
There is a jira for this https://issues.apache.org/jira/browse/BEAM-3198