有没有办法只读Kafka主题的特定字段?
我有个主题,比如说 person
使用架构 personSchema
. 架构包含许多字段,例如 id
, name
, address
, contact
, dateOfBirth
.
我只想 id
, name
以及 address
. 我该怎么做?
目前我´m使用apachebeam读取流,并打算随后将数据写入bigquery。我想用 Filter
但由于布尔返回类型,无法使其工作
在这里´这是我的代码:
Pipeline pipeline = Pipeline.create();
PCollection<KV<String, Person>> kafkaStreams =
pipeline
.apply("read streams", dataIO.readStreams(topic))
.apply(Filter.by(new SerializableFunction<KV<String, Person>, Boolean>() {
@Override
public Boolean apply(KV<String, Order> input) {
return input.getValue().get("address").equals(true);
}
}));
哪里 dataIO.readStreams
正在返回以下内容:
return KafkaIO.<String, Person>read()
.withTopic(topic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(PersonAvroDeserializer.class)
.withConsumerConfigUpdates(consumer)
.withoutMetadata();
我希望能为可能的解决办法提出建议。
2条答案
按热度按时间6yoyoihd1#
也可以通过创建新的
TableSchema
你自己只需要填写必填字段。稍后在编写bigquery时,可以将新创建的模式作为参数而不是旧的参数传递。我还应该提到,如果您要将avro记录转换为bigquery´s
TableRow
在某些时候,您可能也需要在那里执行一些检查。juud5qan2#
您可以使用ksqldb来实现这一点,它也可以直接与kafka connect一起工作,kafka connect有一个用于bigquery的sink连接器