我在Dataflow中使用STORAGE_WRITE_API
方法通过批处理管道将数据写入BigQuery。这会导致问题,有时会卡住,无法将数据加载到Biquery中。它适用于小表,但适用于大表,它开始出现问题,但没有引发任何错误。
我用Default
write方法尝试了相同的代码,它可以在小表和大表中正常运行。
所以我想知道BigQuery的STORAGE_WRITE_API
方法是否推荐用于批处理管道?
rows.apply(BigQueryIO.writeTableRows()
.withJsonSchema(tableJsonSchema)
.to(String.format("project:SampleDataset.%s", tableName))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
);
1条答案
按热度按时间u5rb5r591#
根据documentation,
STORAGE_WRITE_API
推荐用于batch
和streaming
:BigQuery存储写入API是用于BigQuery的统一数据接收API。它将流式接收和批处理加载组合到单个高性能API中。您可以使用存储写入API将记录真实的流式传输到BigQuery中,或者批处理任意数量的记录并在单个原子操作中提交它们。
使用存储写入API“精确一次"传送语义的优点。存储写入API通过使用流偏移量支持精确一次语义。与tabledata.insertAll方法不同,如果客户机在附加记录时提供流偏移量,则存储写入API绝不会写入流中具有相同偏移量的两条消息。
流级事务。可以将数据写入流,然后将数据作为单个事务提交。如果提交操作失败,可以安全地重试该操作。
跨流的事务处理。多个工作进程可以创建自己的流来独立处理数据。当所有工作进程都完成后,可以将所有流作为一个事务处理提交。
高效协议。存储写入API比旧的insertAll方法更高效,因为它使用gRPC流而不是HTTP上的REST。存储写入API还支持协议缓冲区形式的二进制格式,这是比JSON更高效的线格式。写入请求是异步的,顺序有保证。
方案更新检测。如果基础表方案在客户机流式传输时发生更改,存储写入API将通知客户机。客户机可以决定是使用更新的方案重新连接,还是继续写入现有连接。
更低的成本。存储写入API的成本比旧的insertAll流API低得多。此外,您每月可以免费接收多达2TB的数据。
批处理和流处理有很多优点。
对于批处理模式,它比
BATCH_LOAD
方法效率更高。您需要检查所有可能的日志,以了解这种奇怪的行为:
dataflow_step
上使用筛选器的云日志记录如果可能,请使用最新的apache beam版本
2.43.0