我们将protobuf与gcp的pubsub和dataflow结合使用。我们用一个proto文件定义发送到pubsub的数据和bigquery模式。
publisher-(send proto)->pubsub->dataflow-(write)->bigquery
有时数据流会做一些修饰性的改变,但它主要是将protobuf中的字段复制到bigquery中。
我的问题是,有没有一种方法可以自动将protobuf模型转换为bigquery的tablerow?
下面是简化的数据流代码。我想消除大部分的代码 ProtoToTableRow
班级:
public class MyPipeline {
public static void main(String[] args) {
events = pipeline.apply("ReadEvents",
PubsubIO.readProtos(Core.MyProtoObject.class).fromSubscription(subscription));
events.apply("ConvertToTableRows", ParDo.of(new ProtoToTableRow()))
.apply("WriteToBigQuery", BigQueryIO.writeTableRows()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withExtendedErrorInfo()
.to(table));
}
}
// I want this class to be super thin!
class ProtoToTableRow extends DoFn<Core.MyProtoObject, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
Core.Foo foo = c.element().getFoo();
TableRow fooRow = new TableRow()
.set("id", foo.getId())
.set("bar", foo.getBar())
.set("baz", foo.getBaz());
// similar code repeated for 100s of lines
TableRow row = new TableRow()
.set("foo", foo)
c.output(row);
}
}
1条答案
按热度按时间5cnsuln71#
你可以用一种很酷的方式来完成。beam为各种类提供了模式推理方法,包括javabean、autovalue类以及协议缓冲区。
对于管道,不需要转换为tablerow,可以执行以下操作:
注意
useBeamRows
中的参数BigQueryIO.write
-这将使用自动转换。