仅当重命名字段存在时才转换它

ie3xauqp  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(338)

我有一个s3接收器连接器用于多个主题(topic\u a,topic\u b,topic\u c),topic\u a有字段created\u date,topic\u b,topic\u c有created\u date。我用过下面的 transforms.RenameField.renames 重命名字段(已创建)_date:creation_date)但是,由于只有一个主题创建了日期,而其他主题没有,所以连接器出现故障。
我想将所有消息(来自带有单个连接器的所有主题)移动到带有creation\u date的s3中(如果存在,则将created\u date重命名为creation\u date),但我无法找出regex或transformer来重命名特定主题的字段(如果存在)。

"config":{
      "connector.class":"io.confluent.connect.s3.S3SinkConnector",
      "errors.log.include.messages":"true",
      "s3.region":"eu-west-1",
      "topics.dir":"dir",
      "flush.size":"5",
      "tasks.max":"2",
      "s3.part.size":"5242880",
      "timezone":"UTC",
      "locale":"en",
      "format.class":"io.confluent.connect.s3.format.json.JsonFormat",
      "errors.log.enable":"true",
      "s3.bucket.name":"bucket",
      "topics": "topic_a, topic_b, topic_c",
      "s3.compression.type":"gzip",
      "partitioner.class":"io.confluent.connect.storage.partitioner.DailyPartitioner",
      "name":"NAME",
      "storage.class":"io.confluent.connect.s3.storage.S3Storage",
      "key.converter.schemas.enable":"true",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter.schemas.enable":"true",
      "value.converter":"io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url":"https://schemaregistry.com",
      "enhanced.avro.schema.support": "true",
      "transforms": "RenameField",
      "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.RenameField.renames": "created_date:creation_date"
   }
62lalag4

62lalag41#

只有主题a创建了日期,其他人没有,
然后你会使用不同的连接器。一个带有变换,所有主题带有字段,然后另一个没有变换。
从带有单个连接器的所有主题
这个比例不太好。你正在制作有限的用户线程和一个用户组来同时阅读许多主题。多个连接器将更好地分配负载。

相关问题