如何在Kafka中使用sftp csv连接器时将键设置为null,我需要这个来分发记录在循环的方式跨经纪人

edqdpe6u  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(86)

这是我为我的csv源连接器使用的配置。这是将key作为“{}”生成一个空结构。我认为这导致所有的消息都聚集在一个分区中。

"name": "NdpiSourceConnector_csv_1",
    "config": {
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector",
        "cleanup.policy":"MOVE",
        "behavior.on.error":"IGNORE",
        "input.path": "/home/******/Desktop/ndpi/drop",
        "error.path": "/home/******/Desktop/ndpi/error",
        "finished.path": "/home/shyena/Desktop/ndpi/processed",
        "input.file.pattern": ".*\\.csv",
        "sftp.username":"********",
        "sftp.password":"**********",
        "sftp.host":"**************",
        "sftp.port":22,
        "kafka.topic": "NdpiSourceTopic1",
        "csv.first.row.as.header": "true",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable":"false",
        "key.schema":"{\"type\":\"STRUCT\",\"name\":\"FlowRecord\",\"isOptional\":\"true\",\"fieldSchemas\":{}}",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.schema": "{\"type\":\"STRUCT\",\"name\":\"FlowRecord\",\"fieldSchemas\":{\"flow_id\":{\"type\":\"STRING\"},\"protocol\":{\"type\":\"STRING\"},\"first_seen\":{\"type\":\"STRING\"},\"last_seen\":{\"type\":\"STRING\"},\"duration\":{\"type\":\"STRING\"},\"src_ip\":{\"type\":\"STRING\"},\"src_port\":{\"type\":\"STRING\"},\"dst_ip\":{\"type\":\"STRING\"},\"dst_port\":{\"type\":\"STRING\"}}}",
        "value.converter.schemas.enable":"false"
    }
  }

从文档中我了解到,向key传递空值将在RR中分发记录。当我尝试使用python脚本插入记录而不指定键时,记录将以循环方式分发。
什么是我需要做的时候使用连接器得到相同的结果,请帮助我。

6vl6ewon

6vl6ewon1#

要将键设置为null(并让生产者分区程序使用空键完成其工作),您可以向连接器添加SMT:

// Add the following SMT configuration
"transforms": "setKeyToNull",
"transforms.setKeyToNull.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
"transforms.setKeyToNull.blacklist": ".*"

相关问题