如何在apachebeam中用bigquery io编写bigquery?

nwsw7zdq  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(476)

我正在尝试建立一个apachebeam管道,它使用apachebeam从kafka读取数据并写入bigquery。我用这里的逻辑过滤出一些坐标:https://www.talend.com/blog/2018/08/07/developing-data-processing-job-using-apache-beam-streaming-pipeline/ tldr:主题中的消息的格式为id,x,y。过滤掉x>100或y>100的所有消息
我读取数据,进行两次转换,然后定义表模式,然后尝试写入bigquery。我不太清楚如何调用write方法。可能是因为缺乏java泛型知识。我认为这应该是一个集合,但我想不出来。
以下是管道代码处理如果它被认为是代码转储,我只想给出整个上下文:

  1. Pipeline pipeline = Pipeline.create(options);
  2. pipeline
  3. .apply(
  4. KafkaIO.<Long, String>read()
  5. .withBootstrapServers(options.getBootstrap())
  6. .withTopic(options.getInputTopic())
  7. .withKeyDeserializer(LongDeserializer.class)
  8. .withValueDeserializer(StringDeserializer.class))
  9. .apply(
  10. ParDo.of(
  11. new DoFn<KafkaRecord<Long, String>, String>() {
  12. @ProcessElement
  13. public void processElement(ProcessContext processContext) {
  14. KafkaRecord<Long, String> record = processContext.element();
  15. processContext.output(record.getKV().getValue());
  16. }
  17. }))
  18. .apply(
  19. "FilterValidCoords",
  20. Filter.by(new FilterObjectsByCoordinates(options.getCoordX(), options.getCoordY())))
  21. .apply(
  22. "ExtractPayload",
  23. ParDo.of(
  24. new DoFn<String, KV<String, String>>() {
  25. @ProcessElement
  26. public void processElement(ProcessContext c) throws Exception {
  27. c.output(KV.of("filtered", c.element()));
  28. }
  29. }));
  30. TableSchema tableSchema =
  31. new TableSchema()
  32. .setFields(
  33. ImmutableList.of(
  34. new TableFieldSchema()
  35. .setName("x_cord")
  36. .setType("STRING")
  37. .setMode("NULLABLE"),
  38. new TableFieldSchema()
  39. .setName("y_cord")
  40. .setType("STRING")
  41. .setMode("NULLABLE")
  42. ));
  43. pipeline
  44. .apply(
  45. "Write data to BQ",
  46. BigQueryIO
  47. .<String, KV<String, String>>write() //I'm not sure how to call this method
  48. .optimizedWrites()
  49. .withSchema(tableSchema)
  50. .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
  51. .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
  52. .withSchemaUpdateOptions(ImmutableSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION))
  53. .withMethod(FILE_LOADS)
  54. .to(new TableReference()
  55. .setProjectId("prod-analytics-264419")
  56. .setDatasetId("publsher")
  57. .setTableId("beam_load_test"))
  58. );
798qvoo8

798qvoo81#

你想要这样的东西:

  1. [..]
  2. pipeline.apply(BigQueryIO.writeTableRows()
  3. .to(String.format("%s.dataset.table", options.getProject()))
  4. .withCreateDisposition(CREATE_IF_NEEDED)
  5. .withWriteDisposition(WRITE_APPEND)
  6. .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
  7. .withSchema(getTableSchema()));

相关问题