我一直在尝试使用javaclient生成数据,并使用sink connector将数据存储在postgres中。我可以将数据以protobuf格式发布到主题中,但sink connector似乎不起作用。只要确保数据在topic中发布并可用,我就在java客户机中构建了consumer,并且我可以看到数据。
所以数据在主题中,但我的接收器连接器配置有问题。请找到下面的配置细节,任何帮助都非常感谢!
connect-standalone.properties:独立连接:
bootstrap.servers服务器=localhost:9092
key.converter=com.blueplant.connect.protobuf.protobuf转换器
value.converter=com.blueplant.connect.protobuf.protobufconverter
key.converter.schemas.enable=假
value.converter.schemas.enable=真
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
connect-postgres-protobuf.属性:
name=sink postgres connector.class=io.confluent.connect.jdbc.jdbcsinkconnector
tasks.max=2 topics=docker原型主题
连接.url=jdbc:postgresql://localhost:5432/kafka-test
connection.user=postgres connection.password=***insert.mode=insert
value.converter=com.blueplant.connect.protobuf.protobufconverter
value.converter.protoclassname=com.test.extraction.model.protobuf.albumouterclass$album
key.converter=com.blueplant.connect.protobuf.protobuf转换器
auto.create=真auto.evolve=假
offset.storage.file.filename=/tmp/post-sink.offsets
相册数据:
artist: "Tears For Fears"
song_title: "Songs from the Big Chair"
错误日志:
[task-thread-sink-postgres-1] INFO io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect - Checking PostgreSql dialect for existence of table "docker-proto-topic-2"
[task-thread-sink-postgres-1] INFO io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect - Using PostgreSql dialect table "docker-proto-topic-2" absent
[task-thread-sink-postgres-1] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - WorkerSinkTask{id=sink-postgres-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted.
org.apache.kafka.connect.errors.ConnectException: null (ARRAY) type doesn't have a mapping to the SQL database column type
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getSqlType(GenericDatabaseDialect.java:1727)
暂无答案!
目前还没有任何答案,快来回答吧!