java—使用ApacheBeam仅从kafka主题中获取字段的子集

5vf7fwbs  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(280)

有没有办法只读Kafka主题的特定字段?
我有个主题,比如说 person 使用架构 personSchema . 架构包含许多字段,例如 id , name , address , contact , dateOfBirth .
我只想 id , name 以及 address . 我该怎么做?
目前我´m使用apachebeam读取流,并打算随后将数据写入bigquery。我想用 Filter 但由于布尔返回类型,无法使其工作
在这里´这是我的代码:

Pipeline pipeline = Pipeline.create();
PCollection<KV<String, Person>> kafkaStreams =
                pipeline
               .apply("read streams", dataIO.readStreams(topic))
               .apply(Filter.by(new SerializableFunction<KV<String, Person>, Boolean>() {
                  @Override
                  public Boolean apply(KV<String, Order> input) {
                     return input.getValue().get("address").equals(true);
                  }
                  }));

哪里 dataIO.readStreams 正在返回以下内容:

return KafkaIO.<String, Person>read()
                .withTopic(topic)
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(PersonAvroDeserializer.class)
                .withConsumerConfigUpdates(consumer)
                .withoutMetadata();

我希望能为可能的解决办法提出建议。

6yoyoihd

6yoyoihd1#

也可以通过创建新的 TableSchema 你自己只需要填写必填字段。稍后在编写bigquery时,可以将新创建的模式作为参数而不是旧的参数传递。

TableSchema schema = new TableSchema();
        List<TableFieldSchema> tableFields = new ArrayList<TableFieldSchema>();

        TableFieldSchema id =
                new TableFieldSchema()
                        .setName("id")
                        .setType("STRING")
                        .setMode("NULLABLE");

        tableFields.add(id);

        schema.setFields(tableFields);
        return schema;

我还应该提到,如果您要将avro记录转换为bigquery´s TableRow 在某些时候,您可能也需要在那里执行一些检查。

juud5qan

juud5qan2#

您可以使用ksqldb来实现这一点,它也可以直接与kafka connect一起工作,kafka connect有一个用于bigquery的sink连接器

CREATE STREAM MY_SOURCE WITH (KAFKA_TOPIC='person', VALUE_FORMAT=AVRO');

CREATE STREAM FILTERED_STREAM AS SELECT id, name, address FROM MY_SOURCE;

CREATE SINK CONNECTOR SINK_BQ_01 WITH (
  'connector.class' = 'com.wepay.kafka.connect.bigquery.BigQuerySinkConnector',
  'topics' = 'FILTERED_STREAM',
…
);

相关问题