Kafka 我可以使用单一的s3-sink连接器指向相同的字段名称的时间戳字段通过使用不同类型的Avro架构的不同主题?

pw9qyyiw  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(104)

主题t1的模式

{
  "type": "record",
  "name": "Envelope",
  "namespace": "t1",
  "fields": [
    {
      "name": "before",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            {
              "name": "id",
              "type": {
                "type": "long",
                "connect.default": 0
              },
              "default": 0
            },
            {
              "name": "createdAt",
              "type": [
                "null",
                {
                  "type": "string",
                  "connect.version": 1,
                  "connect.name": "io.debezium.time.ZonedTimestamp"
                }
              ],
              "default": null
            },
           
          ],
          "connect.name": "t1.Value"
        }
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        "Value"
      ],
      "default": null
    }
   
  ],
  "connect.name": "t1.Envelope"
}

主题T2的模式

{
    "type": "record",
    "name": "Value",
    "namespace": "t2",
    "fields": [
          {
              "name": "id",
              "type": {
                "type": "long",
                "connect.default": 0
              },
              "default": 0
          },
          {
            "name": "createdAt",
            "type": [
                "null",
                {
                    "type": "string",
                    "connect.version": 1,
                    "connect.name": "io.debezium.time.ZonedTimestamp"
                }
            ],
            "default": null
          }
    ],
    "connect.name": "t2.Value"
}

s3-sink连接器配置

connector.class=io.confluent.connect.s3.S3SinkConnector
behavior.on.null.values=ignore
s3.region=us-west-2
partition.duration.ms=1000
flush.size=1
tasks.max=3
timezone=UTC
topics.regex=t1,t2
aws.secret.access.key=******
locale=US
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
value.converter.schemas.enable=false
name=s3-sink-connector
aws.access.key.id=******
errors.tolerance=all
value.converter=org.apache.kafka.connect.json.JsonConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
key.converter=org.apache.kafka.connect.storage.StringConverter
s3.bucket.name=s3-sink-connector-bucket
path.format=YYYY/MM/dd
timestamp.extractor=RecordField
timestamp.field=after.createdAt

通过使用此连接器配置,我得到了t2主题的错误,即**“createdAt field does not exist”。如果我设置timestamp.field = createdAt**,则会为**t1主题“createdAt field does not exist”抛出错误。
如何使用同一个连接器同时指向两个模式中的
“createdAt”**字段?
是否可以通过使用单个s3-sink连接器配置来实现这一点?
如果这种情况是可能的,那么我如何才能做到这一点,我必须使用哪些属性来实现这一点?
如果任何人对此有想法,请对此提出建议。如果有其他方法可以做到这一点,请也建议这样做。

3phpmpom

3phpmpom1#

所有主题都需要相同的时间戳字段;无法配置主题到字段的Map。
您的t2模式没有after字段,因此需要运行两个单独的连接器
该字段还需要出现在所有记录中,否则分区程序将无法工作。

相关问题