我的想法是使用Debezium嵌入式连接器与Sping Boot 并连接到SQL Server数据库。我看到的唯一问题是如何跟踪偏移量,理想情况下不使用数据库。有没有类似的例子,或者有没有人遇到类似的问题?与Debezium的联系是这样的:
public String buildDebeziumConnectorString() {
StringBuilder sb = new StringBuilder(SQLSERVER_CONNECTOR_NAME);
sb.append("?databaseHostName=").append(properties.getDatabaseHostName())
.append("&databasePort=").append(properties.getDatabasePort())
.append("&databaseUser=").append(properties.getDatabaseUser())
.append("&databasePassword=").append(properties.getDatabasePassword())
.append("&databaseServerName=").append(properties.getDatabaseServerName())
.append("&includeSchemaChanges=").append(properties.isIncludeSchemaChanges())
.append("&databaseDbname=").append(properties.getDatabaseDbname())
.append("&tableWhitelist=").append(properties.getTableWhitelist())
.append("&offsetStorageFileName=").append(properties.getOffsetStorageFileName())
.append("&databaseHistoryFileFilename=").append(properties.getDatabaseHistoryFileFilename());
return sb.toString();}
特别是,(“&offsetName=”)是我最关心的部分。
我在一个分布式环境中,因为我已经在使用Kafka,所以我想用它来跟踪偏移量。你知道Debezium嵌入式连接器是否可以做到这一点吗?
1条答案
按热度按时间ax6ht2ek1#
您正在使用Debezium,因此您已经“拥有数据库”。(你正在从中提取的)。
一般来说,您不会使用独立/嵌入模式,因为这是以有状态的方式管理自己的偏移量的原因。
相反,您可以在“分布式模式”下运行Kafka Connect / Debezium Server,然后Kafka负责存储数据包和偏移量。
换句话说,使用文件假设您只在静态服务器上的一个示例上运行(因为该文件不会分发给其他使用者进程,例如,在高可用性部署场景中),而不是在临时/无状态服务器上运行
那么,既然你似乎在使用Apache Camel,你有没有注意到文件名参数不是必需的,而是基于Kafka的
offsetStorage
和offsetTopic
的其他设置,它的复制因子等?同样,数据库历史也应该使用一个主题。在Debezium docs中也可以找到相同的内容。因为我已经在使用Kafka
Kafka也可以用于“非分布式环境”