我有一个JDBC接收器连接器从Postgres源连接器检索数据,我需要将数据从“parametros”源表复制到“parametros_sistema”目标表。我使用org.apache.Kafka.connect.transforms.ReplaceField$ValueTransformer将特定列从源指向目标,因为我的列名在源和目标中不同,但是当我启动我的Flume连接器时,我收到了这个错误:
错误[collaborator-sink-postgres-connector| task-0] WorkerSinkTask{id=collaborator-sink-postgres-connector-0}任务引发了一个未捕获且不可恢复的异常。任务正在被终止,在手动重新启动之前不会恢复。错误:表“collab-service”.“public”.“parametros”丢失,自动创建被禁用(org.apache.Kafka.connect.runtime.WorkerSinkTask:586)
问题是我无法设置目标表名,因为连接器试图在我的目标数据库上创建一个与源表同名的表。有人知道如何解决这个问题吗?
P.S:源和目标都是Postgres数据库
我的源连接器:
debezium/debezium-connector-postgresql:2.2.1
name = collaborator-postgres-connector
connector.class = io.debezium.connector.postgresql.PostgresConnector
tasks.max = 1
topic.prefix = collab-service
database.hostname = host
database.server.name = collaborator-postgres-server
database.port = 5432
database.user = user
database.password = password
database.dbname = sflm_dev
plugin.name = pgoutput
slot.name = collab_test
decimal.handling.mode = double
snapshot.mode = always
schema.name.adjustment.mode = none
table.include.list = public.parametros
database.history.kafka.topic = postgres_history
database.history.kafka.bootstrap.servers = kafka:9092
message.prefix.include.list = after
字符串
我的Flume连接器:
confluentinc/kafka-connect-jdbc:10.7.4
name = collaborator-sink-postgres-connector
connector.class = io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max = 1
topics = collab-service.public.parametros
connection.url = host
connection.user = user
connection.password = password
database = collaborator_suite
auto.create = false
insert.mode = upsert
pk.mode = record_key
transforms = timestampConverter,replaceField
transforms.timestampConverter.type = org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.timestampConverter.field = para_dt_cadastro, para_dt_ult_alt
transforms.timestampConverter.target.type = Timestamp
transforms.replaceField.type = org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.replaceField.renames = para_cd_id:pasi_cd_id, para_tx_dominio:pasi_tx_dominio, para_tx_descricao:pasi_tx_descricao, para_tx_valor:pasi_tx_valor, para_tx_tipo:pasi_tx_tipo, para_dt_ult_alt:pasi_dt_ult_alt, para_dt_cadastro:pasi_dt_cadastro, para_dt_cadastro:pasi_dt_cadastro, para_dt_ult_alt:pasi_dt_ult_alt, usua_cd_id_cadastro:usua_cd_id_cadastro, usua_cd_id_ult_alt:usua_cd_id_ult_alt, para_tx_sistema:pasi_tx_sistema
db.timezone = UTC
pk.fields = para_cd_id
型
我的主题的消息之一的示例:
[
{
"topic": "collab-service.public.parametros",
"partition": 0,
"offset": 1,
"timestamp": 1704296444971,
"timestampType": "CREATE_TIME",
"headers": [],
"key": "Struct{para_cd_id=2}",
"value": {
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "para_cd_id"
},
{
"type": "string",
"optional": false,
"field": "para_tx_dominio"
},
{
"type": "string",
"optional": false,
"field": "para_tx_descricao"
},
{
"type": "string",
"optional": false,
"field": "para_tx_valor"
},
{
"type": "string",
"optional": false,
"field": "para_tx_tipo"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "para_dt_cadastro"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "para_dt_ult_alt"
},
{
"type": "int64",
"optional": true,
"field": "usua_cd_id_cadastro"
},
{
"type": "double",
"optional": true,
"field": "usua_cd_id_ult_alt"
},
{
"type": "string",
"optional": true,
"field": "para_tx_sistema"
}
],
"optional": true,
"name": "collab-service.public.parametros.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "para_cd_id"
},
{
"type": "string",
"optional": false,
"field": "para_tx_dominio"
},
{
"type": "string",
"optional": false,
"field": "para_tx_descricao"
},
{
"type": "string",
"optional": false,
"field": "para_tx_valor"
},
{
"type": "string",
"optional": false,
"field": "para_tx_tipo"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "para_dt_cadastro"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "para_dt_ult_alt"
},
{
"type": "int64",
"optional": true,
"field": "usua_cd_id_cadastro"
},
{
"type": "double",
"optional": true,
"field": "usua_cd_id_ult_alt"
},
{
"type": "string",
"optional": true,
"field": "para_tx_sistema"
}
],
"optional": true,
"name": "collab-service.public.parametros.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "collab-service.public.parametros.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"para_cd_id": 2,
"para_tx_dominio": "SISTEMA_CLOUD_PROJECTID",
"para_tx_descricao": "Variável para referenciar o id do projeto na nuvem do Google",
"para_tx_valor": "collaborator-364516",
"para_tx_tipo": "SISTEMA",
"para_dt_cadastro": 1669047531542921,
"para_dt_ult_alt": 1669047531542921,
"usua_cd_id_cadastro": null,
"usua_cd_id_ult_alt": null,
"para_tx_sistema": "R2D2"
},
"source": {
"version": "2.2.1.Final",
"connector": "postgresql",
"name": "collab-service",
"ts_ms": 1704296442190,
"snapshot": "last",
"db": "sflm_dev",
"sequence": "[null,\"17335024878048\"]",
"schema": "public",
"table": "parametros",
"txId": 989004,
"lsn": 17335024878048,
"xmin": null
},
"op": "r",
"ts_ms": 1704296444402,
"transaction": null
}
}
}
]
型
1条答案
按热度按时间f87krz0w1#
酒店的
table.name.format
做了这项工作。