我对Kafka和ksqldb比较陌生,我正在尝试使用ksqldb进行数据管道。我创建一个源连接器到postgres,定义如下:
CREATE SOURCE CONNECTOR postgres_reader WITH (
'connector.class' = 'io.debezium.connector.postgresql.PostgresConnector',
'database.hostname' = 'host.docker.internal',
'database.port' = '5432',
'database.user' = 'postgres-user',
'database.password' = 'postgres-pw',
'database.dbname' = 'customers',
'database.server.name' = 'customers',
'table.whitelist' = 'public.*',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'key.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://host.docker.internal:8085',
'key.converter.schema.registry.url' = 'http://host.docker.internal:8085',
'transforms' = 'unwrap,createKey,ExtractField',
'transforms.createKey.type' = 'org.apache.kafka.connect.transforms.ValueToKey',
'transforms.createKey.fields' = 'id',
'transforms.ExtractField.type' = 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.ExtractField.field' = 'id',
'transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',
'transforms.unwrap.drop.tombstones' = 'false',
'transforms.unwrap.delete.handling.mode' = 'rewrite',
'topic.prefix' = 'postgres'
);
现在在postges db中,我有一个名为customers
的表:
CREATE TABLE IF NOT EXISTS public.customers
(
id text NOT NULL,
name text,
age integer,
CONSTRAINT customers_pkey PRIMARY KEY (id)
)
我也在ksqlsb中创建了一个流:
CREATE STREAM customers (
id VARCHAR key,
name VARCHAR,
age INTEGER
) WITH (
kafka_topic = 'postgres.public.customers',
value_format = 'avro',
PARTITIONS=1
);
当我如上所述创建流时,它会自动在schema registry中创建主题和新值以及键模式,看起来像这样:
schema-postgres.public.customers-value-v1.avsc:
{
"connect.name": "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema",
"fields": [
{
"default": null,
"name": "NAME",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "AGE",
"type": [
"null",
"int"
]
}
],
"name": "KsqlDataSourceSchema",
"namespace": "io.confluent.ksql.avro_schemas",
"type": "record"
}
schema-postgres.public.customers-key-v1.avsc:
"string"
现在,当我向customers表中插入项目时,连接器检测到新数据并按预期将其发送到相应的Kafka主题,但我在kafka connect中得到一个schema错误:
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject "postgres.public.customers-value", details: [{errorType:'NAME_MISMATCH', description:'The name of the schema has changed (path '/name')', additionalInfo:'expected: io.confluent.ksql.avro_schemas.KsqlDataSourceSchema'}, {errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 'id' at path '/fields/0' in the new schema has no default value and is missing in the old schema', additionalInfo:'id'}, {oldSchemaVersion: 1}, {oldSchema: '{"type":"record","name":"KsqlDataSourceSchema","namespace":"io.confluent.ksql.avro_schemas","fields":[{"name":"NAME","type":["null","string"],"default":null},{"name":"AGE","type":["null","int"],"default":null}],"connect.name":"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema"}'}, {compatibility: 'BACKWARD'}]; error code: 409
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:544)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:532)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:490)
有两件事我不明白:
1.为什么连接器在向主题生成新消息时尝试注册新模式。
1.如果它尝试创建新的架构,则架构应该相同,因为该值包括架构中指定的正确值。
1条答案
按热度按时间ehxuflar1#
我不知道这是否是一个好的解决方案,但最后我只是在流上使用了
JSON
而不是avro
,并在连接器上使用了Json键和值转换器。这是最终配置:
ksql流创建-注意
value_format = 'JSON'
:连接器创建-注意
JsonConverter
用法: