主题t1的模式
{
"type": "record",
"name": "Envelope",
"namespace": "t1",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
{
"name": "id",
"type": {
"type": "long",
"connect.default": 0
},
"default": 0
},
{
"name": "createdAt",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
},
],
"connect.name": "t1.Value"
}
],
"default": null
},
{
"name": "after",
"type": [
"null",
"Value"
],
"default": null
}
],
"connect.name": "t1.Envelope"
}
主题T2的模式
{
"type": "record",
"name": "Value",
"namespace": "t2",
"fields": [
{
"name": "id",
"type": {
"type": "long",
"connect.default": 0
},
"default": 0
},
{
"name": "createdAt",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
}
],
"connect.name": "t2.Value"
}
s3-sink连接器配置
connector.class=io.confluent.connect.s3.S3SinkConnector
behavior.on.null.values=ignore
s3.region=us-west-2
partition.duration.ms=1000
flush.size=1
tasks.max=3
timezone=UTC
topics.regex=t1,t2
aws.secret.access.key=******
locale=US
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
value.converter.schemas.enable=false
name=s3-sink-connector
aws.access.key.id=******
errors.tolerance=all
value.converter=org.apache.kafka.connect.json.JsonConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
key.converter=org.apache.kafka.connect.storage.StringConverter
s3.bucket.name=s3-sink-connector-bucket
path.format=YYYY/MM/dd
timestamp.extractor=RecordField
timestamp.field=after.createdAt
通过使用此连接器配置,我得到了t2主题的错误,即**“createdAt field does not exist”。如果我设置timestamp.field = createdAt**,则会为**t1主题“createdAt field does not exist”抛出错误。
如何使用同一个连接器同时指向两个模式中的“createdAt”**字段?
是否可以通过使用单个s3-sink连接器配置来实现这一点?
如果这种情况是可能的,那么我如何才能做到这一点,我必须使用哪些属性来实现这一点?
如果任何人对此有想法,请对此提出建议。如果有其他方法可以做到这一点,请也建议这样做。
1条答案
按热度按时间3phpmpom1#
所有主题都需要相同的时间戳字段;无法配置主题到字段的Map。
您的t2模式没有
after
字段,因此需要运行两个单独的连接器该字段还需要出现在所有记录中,否则分区程序将无法工作。