这是我为我的csv源连接器使用的配置。这是将key作为“{}”生成一个空结构。我认为这导致所有的消息都聚集在一个分区中。
"name": "NdpiSourceConnector_csv_1",
"config": {
"tasks.max": "1",
"connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector",
"cleanup.policy":"MOVE",
"behavior.on.error":"IGNORE",
"input.path": "/home/******/Desktop/ndpi/drop",
"error.path": "/home/******/Desktop/ndpi/error",
"finished.path": "/home/shyena/Desktop/ndpi/processed",
"input.file.pattern": ".*\\.csv",
"sftp.username":"********",
"sftp.password":"**********",
"sftp.host":"**************",
"sftp.port":22,
"kafka.topic": "NdpiSourceTopic1",
"csv.first.row.as.header": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"key.schema":"{\"type\":\"STRUCT\",\"name\":\"FlowRecord\",\"isOptional\":\"true\",\"fieldSchemas\":{}}",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.schema": "{\"type\":\"STRUCT\",\"name\":\"FlowRecord\",\"fieldSchemas\":{\"flow_id\":{\"type\":\"STRING\"},\"protocol\":{\"type\":\"STRING\"},\"first_seen\":{\"type\":\"STRING\"},\"last_seen\":{\"type\":\"STRING\"},\"duration\":{\"type\":\"STRING\"},\"src_ip\":{\"type\":\"STRING\"},\"src_port\":{\"type\":\"STRING\"},\"dst_ip\":{\"type\":\"STRING\"},\"dst_port\":{\"type\":\"STRING\"}}}",
"value.converter.schemas.enable":"false"
}
}
从文档中我了解到,向key传递空值将在RR中分发记录。当我尝试使用python脚本插入记录而不指定键时,记录将以循环方式分发。
什么是我需要做的时候使用连接器得到相同的结果,请帮助我。
1条答案
按热度按时间6vl6ewon1#
要将键设置为
null
(并让生产者分区程序使用空键完成其工作),您可以向连接器添加SMT: