Kafka 在缺少用于将记录转换为BigQuery格式的架构的情况下,仅支持Map对象

mcdcgff0  于 2023-02-15  发布在  Apache
关注(0)|答案(1)|浏览(207)

我正在将数据从Postgres流到Kakfa再到Big Query。PG中的大多数表都有主键,因此大多数表/主题都有一个Avro键和值模式,这些都可以很好地进入Big Query。
我确实有几个表没有PK,因此也没有Avro键模式。
当我为这些表创建接收器连接器时,连接器错误为Caused by: com.wepay.kafka.connect.bigquery.exception.ConversionConnectException: Only Map objects supported in absence of schema for record conversion to BigQuery format.
如果我删除'key.converter'配置,那么我会得到'顶级Kafka连接架构必须是' struct '类型'错误。
我该怎么办?
以下是连接器配置以供参考,

{
"project": "staging",
"defaultDataset": "data_lake",
"keyfile": "<redacted>",
"keySource": "JSON",
"sanitizeTopics": "true",
"kafkaKeyFieldName": "_kid",
"autoCreateTables": "true",
"allowNewBigQueryFields": "true",
"upsertEnabled": "false",
"bigQueryRetry": "5",
"bigQueryRetryWait": "120000",
"bigQueryPartitionDecorator": "false",
"name": "hd-sink-bq",
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "<redacted>",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "<redacted>",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "<redacted>",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "<redacted>",
"topics": "public.event_issues",
"errors.tolerance": "all",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "connect.bq-sink.deadletter",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"transforms": "tombstoneHandler",
"offset.flush.timeout.ms": "300000",
"transforms.dropNullRecords.predicate": "isNullRecord",
"transforms.dropNullRecords.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.tombstoneHandler.behavior": "drop_warn",
"transforms.tombstoneHandler.type": "io.aiven.kafka.connect.transforms.TombstoneHandler"

}

nxowjjhe

nxowjjhe1#

对于我的情况,我过去使用predicate来处理这种情况,如下所示

{
   ...
   "predicates.isTombstone.type": 
   "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
   "predicates": "isTombstone",
   "transforms.x.predicate":"isTombstone",
   "transforms.x.negate":true
   ...
}

这是根据这里的文档,transforms.x.negate将跳过这样的墓碑记录。

相关问题