java中protobuf到bigquery的转换

vecaoik1  于 2021-07-06  发布在  Java
关注(0)|答案(1)|浏览(478)

我们将protobuf与gcp的pubsub和dataflow结合使用。我们用一个proto文件定义发送到pubsub的数据和bigquery模式。
publisher-(send proto)->pubsub->dataflow-(write)->bigquery
有时数据流会做一些修饰性的改变,但它主要是将protobuf中的字段复制到bigquery中。
我的问题是,有没有一种方法可以自动将protobuf模型转换为bigquery的tablerow?
下面是简化的数据流代码。我想消除大部分的代码 ProtoToTableRow 班级:

  1. public class MyPipeline {
  2. public static void main(String[] args) {
  3. events = pipeline.apply("ReadEvents",
  4. PubsubIO.readProtos(Core.MyProtoObject.class).fromSubscription(subscription));
  5. events.apply("ConvertToTableRows", ParDo.of(new ProtoToTableRow()))
  6. .apply("WriteToBigQuery", BigQueryIO.writeTableRows()
  7. .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
  8. .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
  9. .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
  10. .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
  11. .withExtendedErrorInfo()
  12. .to(table));
  13. }
  14. }
  15. // I want this class to be super thin!
  16. class ProtoToTableRow extends DoFn<Core.MyProtoObject, TableRow> {
  17. @ProcessElement
  18. public void processElement(ProcessContext c) {
  19. Core.Foo foo = c.element().getFoo();
  20. TableRow fooRow = new TableRow()
  21. .set("id", foo.getId())
  22. .set("bar", foo.getBar())
  23. .set("baz", foo.getBaz());
  24. // similar code repeated for 100s of lines
  25. TableRow row = new TableRow()
  26. .set("foo", foo)
  27. c.output(row);
  28. }
  29. }
5cnsuln7

5cnsuln71#

你可以用一种很酷的方式来完成。beam为各种类提供了模式推理方法,包括javabean、autovalue类以及协议缓冲区。
对于管道,不需要转换为tablerow,可以执行以下操作:

  1. pipeline.getSchemaRegistry().registerSchemaProvider(
  2. Core.MyProtoObject.class, new ProtoMessageSchema());
  3. events = pipeline.apply("ReadEvents",
  4. PubsubIO.readProtos(Core.MyProtoObject.class).fromSubscription(subscription));
  5. events.apply("WriteToBigQuery", BigQueryIO.write()
  6. .useBeamRows()
  7. .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
  8. .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
  9. .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
  10. .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
  11. .withExtendedErrorInfo()
  12. .to(table));

注意 useBeamRows 中的参数 BigQueryIO.write -这将使用自动转换。

展开查看全部

相关问题