我已经用 Postgresql
数据库,使用debezium连接器与kafka连接和kafka。kafka有多个示例(3)在运行,配置了zookeeper(3),整个管道中的连接都在工作,但是根据debezium的文档,没有根据数据库中的表自动创建主题。例如,如果表a和表b在某个模式中,我假设这两个主题是在kafka中隐式创建的。连接器和任务的状态为 RUNNING
,下面提到的是我为连接器所做的配置,
{
"name": "geo-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": <dbHostName>,
"database.port": <dbPort>,
"database.user": <dbUser>,
"database.password":<dbPassword> ,
"database.dbname" : <dbName>,
"database.server.name": <logicalName>,
"database.history.kafka.bootstrap.servers":<>,
"database.history.kafka.topic": "schema-changes.inventory",
"plugin.name":"wal2json",
"config.storage.replication.factor": "3",
"offset.storage.replication.factor" : "3",
"auto.create.topics.enable" : "true",
"snapshot.mode" : "always"
}
}
我在连接日志中看到的错误如下:,
2018-08-09 15:28:50,409 - DEBUG [KafkaBasedLog Work Thread - kconnect-offsets:Fetcher@199] - [Consumer clientId=consumer-1, groupId=1] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(kconnect-offsets-10, kconnect-offsets-4, kconnect-offsets-16, kconnect-offsets-7, kconnect-offsets-19, kconnect-offsets-13, kconnect-offsets-22, kconnect-offsets-1)) to broker kafka-02.hotel02.pro06.eu.idealo.com:9092 (id: 2002 rack: pro06)
2018-08-09 15:28:50,465 - DEBUG [kafka-producer-network-thread | producer-6:NetworkClient$DefaultMetadataUpdater@927] - [Producer clientId=producer-6] Sending metadata request (type=MetadataRequest, topics=dbserver1.public.spatial_ref_sys) to node kafka-01.hotel02.pro05.eu.idealo.com:9092 (id: 2004 rack: pro05)
2018-08-09 15:28:50,467 - WARN [kafka-producer-network-thread | producer-6:NetworkClient$DefaultMetadataUpdater@882] - [Producer clientId=producer-6] Error while fetching metadata with correlation id 23856 : {dbserver1.public.spatial_ref_sys=UNKNOWN_TOPIC_OR_PARTITION}
2018-08-09 15:28:50,467 - DEBUG [kafka-producer-network-thread | producer-6:Metadata@270] - Updated cluster metadata version 23852 to Cluster(id = BwqlZApfT-ygzWr_wPcdng, nodes = [kafka-03.hotel02.pro05.eu.idealo.com:9092 (id: 2003 rack: pro05), kafka-01.hotel02.pro05.eu.idealo.com:9092 (id: 2004 rack: pro05), kafka-02.hotel02.pro06.eu.idealo.com:9092 (id: 2002 rack: pro06)], partitions = [])
1条答案
按热度按时间wljmcqd81#
您在日志中看到的消息是警告,而不是错误。你能试试吗
kafka-topics.sh
实用程序列出可用的主题?