如何在apachebeam中用bigquery io编写bigquery?

nwsw7zdq  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(439)

我正在尝试建立一个apachebeam管道,它使用apachebeam从kafka读取数据并写入bigquery。我用这里的逻辑过滤出一些坐标:https://www.talend.com/blog/2018/08/07/developing-data-processing-job-using-apache-beam-streaming-pipeline/ tldr:主题中的消息的格式为id,x,y。过滤掉x>100或y>100的所有消息
我读取数据,进行两次转换,然后定义表模式,然后尝试写入bigquery。我不太清楚如何调用write方法。可能是因为缺乏java泛型知识。我认为这应该是一个集合,但我想不出来。
以下是管道代码处理如果它被认为是代码转储,我只想给出整个上下文:

Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(
                KafkaIO.<Long, String>read()
                        .withBootstrapServers(options.getBootstrap())
                        .withTopic(options.getInputTopic())
                        .withKeyDeserializer(LongDeserializer.class)
                        .withValueDeserializer(StringDeserializer.class))
        .apply(
                ParDo.of(
                        new DoFn<KafkaRecord<Long, String>, String>() {
                            @ProcessElement
                            public void processElement(ProcessContext processContext) {
                                KafkaRecord<Long, String> record = processContext.element();
                                processContext.output(record.getKV().getValue());
                            }
                        }))
        .apply(
                "FilterValidCoords",
                Filter.by(new FilterObjectsByCoordinates(options.getCoordX(), options.getCoordY())))
        .apply(
                "ExtractPayload",
                ParDo.of(
                        new DoFn<String, KV<String, String>>() {
                            @ProcessElement
                            public void processElement(ProcessContext c) throws Exception {
                                c.output(KV.of("filtered", c.element()));
                            }
                        }));

        TableSchema tableSchema =
        new TableSchema()
                .setFields(
                        ImmutableList.of(
                                new TableFieldSchema()
                                        .setName("x_cord")
                                        .setType("STRING")
                                        .setMode("NULLABLE"),
                        new TableFieldSchema()
                                .setName("y_cord")
                                .setType("STRING")
                                .setMode("NULLABLE")

                        ));
        pipeline
                .apply(
                "Write data to BQ",
                BigQueryIO
                        .<String, KV<String, String>>write() //I'm not sure how to call this method
                        .optimizedWrites()
                        .withSchema(tableSchema)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withSchemaUpdateOptions(ImmutableSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION))
                        .withMethod(FILE_LOADS)
                        .to(new TableReference()
                                .setProjectId("prod-analytics-264419")
                                .setDatasetId("publsher")
                                .setTableId("beam_load_test"))
        );
798qvoo8

798qvoo81#

你想要这样的东西:

[..] 
pipeline.apply(BigQueryIO.writeTableRows()
        .to(String.format("%s.dataset.table", options.getProject()))
        .withCreateDisposition(CREATE_IF_NEEDED)
        .withWriteDisposition(WRITE_APPEND)
        .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
        .withSchema(getTableSchema()));

相关问题