Kafka 正在注册的架构与subject - ksqlsb的早期架构不兼容,数据库源连接器错误

ffscu2ro  于 2023-06-28  发布在  Apache
关注(0)|答案(1)|浏览(123)

我对Kafka和ksqldb比较陌生,我正在尝试使用ksqldb进行数据管道。我创建一个源连接器到postgres,定义如下:

CREATE SOURCE CONNECTOR postgres_reader WITH (
    'connector.class' = 'io.debezium.connector.postgresql.PostgresConnector',
    'database.hostname' = 'host.docker.internal',
    'database.port' = '5432',
    'database.user' = 'postgres-user',
    'database.password' = 'postgres-pw',
    'database.dbname' = 'customers',
    'database.server.name' = 'customers',
    'table.whitelist' = 'public.*',
     'value.converter' = 'io.confluent.connect.avro.AvroConverter',
    'key.converter' = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url' = 'http://host.docker.internal:8085',
    'key.converter.schema.registry.url' = 'http://host.docker.internal:8085',
    'transforms' = 'unwrap,createKey,ExtractField',
    'transforms.createKey.type' = 'org.apache.kafka.connect.transforms.ValueToKey',
    'transforms.createKey.fields' = 'id',
    'transforms.ExtractField.type' = 'org.apache.kafka.connect.transforms.ExtractField$Key',
    'transforms.ExtractField.field' = 'id',
    'transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',
    'transforms.unwrap.drop.tombstones' = 'false',
    'transforms.unwrap.delete.handling.mode' = 'rewrite',
    'topic.prefix' = 'postgres'
);

现在在postges db中,我有一个名为customers的表:

CREATE TABLE IF NOT EXISTS public.customers
(
    id text NOT NULL,
    name text,
    age integer,
    CONSTRAINT customers_pkey PRIMARY KEY (id)
)

我也在ksqlsb中创建了一个流:

CREATE STREAM customers (
    id VARCHAR key,
    name VARCHAR,
    age INTEGER
) WITH (
    kafka_topic = 'postgres.public.customers',
    value_format = 'avro',
    PARTITIONS=1
);

当我如上所述创建流时,它会自动在schema registry中创建主题和新值以及键模式,看起来像这样:
schema-postgres.public.customers-value-v1.avsc:

{
  "connect.name": "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema",
  "fields": [
    {
      "default": null,
      "name": "NAME",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "AGE",
      "type": [
        "null",
        "int"
      ]
    }
  ],
  "name": "KsqlDataSourceSchema",
  "namespace": "io.confluent.ksql.avro_schemas",
  "type": "record"
}

schema-postgres.public.customers-key-v1.avsc:

"string"

现在,当我向customers表中插入项目时,连接器检测到新数据并按预期将其发送到相应的Kafka主题,但我在kafka connect中得到一个schema错误:

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject "postgres.public.customers-value", details: [{errorType:'NAME_MISMATCH', description:'The name of the schema has changed (path '/name')', additionalInfo:'expected: io.confluent.ksql.avro_schemas.KsqlDataSourceSchema'}, {errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 'id' at path '/fields/0' in the new schema has no default value and is missing in the old schema', additionalInfo:'id'}, {oldSchemaVersion: 1}, {oldSchema: '{"type":"record","name":"KsqlDataSourceSchema","namespace":"io.confluent.ksql.avro_schemas","fields":[{"name":"NAME","type":["null","string"],"default":null},{"name":"AGE","type":["null","int"],"default":null}],"connect.name":"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema"}'}, {compatibility: 'BACKWARD'}]; error code: 409
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:544)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:532)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:490)

有两件事我不明白:
1.为什么连接器在向主题生成新消息时尝试注册新模式。
1.如果它尝试创建新的架构,则架构应该相同,因为该值包括架构中指定的正确值。

ehxuflar

ehxuflar1#

我不知道这是否是一个好的解决方案,但最后我只是在流上使用了JSON而不是avro,并在连接器上使用了Json键和值转换器。
这是最终配置:
ksql流创建-注意value_format = 'JSON'

CREATE STREAM customers (
    id VARCHAR key,
    name VARCHAR,
    age INTEGER
) WITH (
    kafka_topic = 'postgres.public.customers',
    value_format = 'JSON',
    PARTITIONS=1
);

连接器创建-注意JsonConverter用法:

CREATE SOURCE CONNECTOR postgres_reader WITH (
  'connector.class' = 'io.debezium.connector.postgresql.PostgresConnector',
  'database.hostname' = 'host.docker.internal',
  'database.port' = '5432',
  'database.user' = 'postgres-user',
  'database.password' = 'postgres-pw',
  'database.dbname' = 'customers',
  'database.server.name' = 'customers',
  'table.whitelist' = 'public.*',
  'transforms' = 'unwrap,extractField',
  'transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',
  'transforms.unwrap.drop.tombstones' = 'false',
  'transforms.unwrap.delete.handling.mode' = 'rewrite',
  'transforms.extractField.type' = 
  'org.apache.kafka.connect.transforms.ExtractField$Key',
  'transforms.extractField.field' = 'id',
  'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
  'value.converter.schemas.enable' = 'false',
  'topic.prefix' = 'postgres'
);

相关问题