我正在创建一个自定义的Kafka连接转换,用于在将源代码中的某些数据写入Kafka主题之前将其转换。我试着让它在没有模式的情况下工作(也许只工作)。然而,我的代码似乎总是认为有一个模式。
下面是转换中的一些代码:
@Override
public R apply(R record) {
if (operatingSchema(record) == null) {
return applySchemaless(record);
} else {
// return applyWithSchema(record);
throw new IllegalStateException("MyTransform does not currently support schemas" + operatingSchema(record));
}
}
public static class Value<R extends ConnectRecord<R>> extends MyTransform<R> {
@Override
protected Schema operatingSchema(R record) {
return record.valueSchema();
}
@Override
protected Object operatingValue(R record) {
return record.value();
}
那个 IllegalStateException
每次都抛出。这是我的连接器配置,我在本地kafka connect上运行它 confluent local start
.
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"transforms" : "convert",
"transforms.convert.type" : "com.org.team.kafka.connect.transform.common.MyTransform$Value"
如果我移除转换,让kafka connect消费并将未修改的数据推送到我的主题,它实际上会将它推送到没有模式的主题,它看起来是这样的:
{
"id": 1,
"application": "app",
"event": "{\"id\": 42, \"type\": \"add\", \"eventDate\": {\"hour\": 15, \"nano\": 298633600, \"year\": 2020, \"month\": \"JUNE\", \"minute\": 34, \"offset\": {\"id\": \"-04:00\", \"rules\": {\"fixedOffset\": true, \"transitions\": [], \"transitionRules\": []}, \"totalSeconds\": -14400}, \"second\": 37, \"dayOfWeek\": \"THURSDAY\", \"dayOfYear\": 177, \"dayOfMonth\": 25, \"monthValue\": 6}}",
"created_ts": 1593099276319,
"updated_ts": 1593099276319,
"context": "action"
}
所以,如果它没有模式,只是纯json,为什么我的代码会返回 record.valueSchema
. 它是一个 Struct
,顺便说一下。
谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!