kafka connect实现错误

sulc1iza  于 2021-06-07  发布在  Kafka
关注(0)|答案(4)|浏览(455)

我在这里浏览教程:http://kafka.apache.org/documentation.html#introduction
当我进入“第7步:使用kafka connect导入/导出数据”并尝试启动两个连接器时,出现以下错误:

ERROR Failed to flush WorkerSourceTask{id=local-file-source-0}, timed out while waiting for producer to flush outstanding messages, 1 left

ERROR Failed to commit offsets for WorkerSourceTask

以下是本教程的一部分:
接下来,我们将启动两个以独立模式运行的连接器,这意味着它们在单个本地专用进程中运行。我们提供了三个配置文件作为参数。第一个始终是kafka connect进程的配置,包含公共配置,例如要连接到的kafka代理和数据的序列化格式。其余的配置文件分别指定要创建的连接器。这些文件包括唯一的连接器名称、要示例化的连接器类以及连接器所需的任何其他配置。 bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties 我花了一些时间寻找解决办法,但找不到任何有用的办法。感谢您的帮助。
谢谢!

wz3gfoph

wz3gfoph1#

你需要先启动zookeeper和kafka服务器,然后再运行该行。

启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

启动多个kafka服务器

bin/kafka-server-start.sh config/server.properties
bin/kafka-server-start.sh config/server-1.properties
bin/kafka-server-start.sh config/server-2.properties

启动连接器

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

然后你会看到一些行被写进 test.sink.txt :

foo
bar

您可以启动消费者进行检查:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
kknvjkwl

kknvjkwl2#

在运行kafka connect之前,需要启动kafka服务器和zookeeper。您需要首先执行下面“步骤2:启动服务器”中的cmds:
bin/zookeeper-server-start.sh配置/zookeeper.properties
bin/kafka-server-start.sh配置/server.properties
从here:httpshttp://mail-archives.apache.org/mod\u mbox/kafka-users/201601.mbox/%3ccak0bmepgwml93wgm2jvckbut5raziawzorotfc_a6q=gaxqgfq@mail.gmail.com%3e

z2acfund

z2acfund3#

如果您使用如下主机名配置kafka代理 my.sandbox.com 确保修改 config/connect-standalone.properties 因此:

bootstrap.servers=my.sandbox.com:9092

在hortonworks hdp上,默认端口为6667,因此设置为

bootstrap.servers=my.sandbox.com:6667

如果启用了kerberos,则还需要以下设置(不带ssl):

security.protocol=PLAINTEXTSASL
producer.security.protocol=PLAINTEXTSASL
producer.sasl.kerberos.service.name=kafka
consumer.security.protocol=PLAINTEXTSASL
consumer.sasl.kerberos.service.name=kafka
btqmn9zl

btqmn9zl4#

出现此错误的原因是,我使用config/server.properties创建的第一台服务器没有运行。我假设因为它是主题的开头,所以消息不能被刷新,偏移量不能被提交。
使用服务器属性(config/server.properties)启动kafka服务器后,这个问题就解决了。

相关问题