我正在使用融合Kafka多合一码头形象设置Kafka在数字海洋滴。我能够成功地运行kafka并使用kafka connectrestapi添加hdfs连接器。我将主机ip替换为cloudera cdh水滴的ip。
curl -X POST \
-H "Content-Type: application/json" \
--data '{
"name": "hdfs-sink",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "test_hdfs",
"hdfs.url": "hdfs://HOST_IP:8020",
"flush.size": "3",
"name": "hdfs-sink"
}}' \
http://HOST_IP:8083/connectors
然后,当我curl kafka connect for the hdfs sink status时,我在task下的json响应中收到以下错误(服务的状态是running,但任务失败了):
java.lang.runtimeexception:io.confluent.kafka.serializers.subject.topicnamestrategy不是io.confluent.kafka.serializers.subject.subjectnamestrategy的示例
更新,所以我设法克服这个错误,使用5.0.0,而不是测试版(愚蠢的我)推荐的板球007。
但是,当我实际尝试将数据发布到hdfs示例时,我收到了一个不同的错误。我使用ksql datagen来生成假数据 docker-compose exec ksql-datagen ksql-datagen quickstart=users format=json topic=test_hdfs maxInterval=1000 \ propertiesFile=/etc/ksql/datagen.properties bootstrap-server=broker:9092
```
{
"name": "hdfs-sink",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [{
"state": "FAILED",
"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: test_hdfs\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n",
"id": 0,
"worker_id": "connect:8083"
}],
"type": "sink"
}
编辑2
avro ksql datagen的堆栈跟踪失败
Outputting 1000000 to test_hdfs
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error serializing row to topic test_hdfs using Converter API
Caused by: org.apache.kafka.connect.errors.DataException: test_hdfs
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:77)
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:44)
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:27)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:854)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:816)
at io.confluent.ksql.datagen.DataGenProducer.populateTopic(DataGenProducer.java:94)
at io.confluent.ksql.datagen.DataGen.main(DataGen.java:100)
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1334)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1309)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:172)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:320)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:312)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:307)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:114)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:153)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79)
at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:116)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:44)
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:27)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:854)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:816)
at io.confluent.ksql.datagen.DataGenProducer.populateTopic(DataGenProducer.java:94)
at io.confluent.ksql.datagen.DataGen.main(DataGen.java:100)
编辑3
因为某种原因,即使我用ksql datagen生成avro数据,我仍然在kafka connect上收到一个json序列化错误。 `docker-compose exec ksql-datagen ksql-datagen schema=/impressions.avro format=avro schemaRegistryUrl=http://schema-registry:8081 key=impressionid topic=test_hdfs maxInterval=1000 \ propertiesFile=/etc/ksql/datagen.properties bootstrap-server=broker:9092` ```
curl -X POST \
-H "Content-Type: application/json" \
--data '{
"name": "hdfs-sink",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"format.class": "io.confluent.connect.hdfs.avro.AvroFormat",
"tasks.max": "1",
"schema.compatibility": "FULL",
"topics": "test_hdfs",
"hdfs.url": "hdfs://cdh.nuvo.app:8020",
"flush.size": "3",
"name": "hdfs-sink"
}}' \
http://kafka.nuvo.app:8083/connectors
架构注册表配置
# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
Kafka连接日志:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'impression_816': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"impression_816"; line: 1, column: 29]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'impression_816': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"impression_816"; line: 1, column: 29]
编辑4
[2018-08-22 02:05:51,140] ERROR WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: test_hdfs1
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2018-08-22 02:05:51,141] ERROR WorkerSinkTask{id=hdfs-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2018-08-22 02:05:51,243] INFO Publish thread interrupted for client_id=consumer-8 client_type=CONSUMER session= cluster=lUWD_PR0RsiTkaunoUrUfA group=connect-hdfs-sink (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
1条答案
按热度按时间ohfgkhjo1#
你准备好了吗
ksql-datagen ... format=json
但该错误表明您已在kafka connect中设置了avroconverter看看你的文件。。。
如果要生成avro数据,请参阅
ksql-datagen
文件。尽管您正在生成json,但目前,这并不是您配置的hdfs上的内容。
avro是hdfs connect的默认输出格式;如果您参考配置文档。
format.class
将数据写入存储时要使用的格式类。格式类实现io.confluent.connect.storage.format.Format
接口。类型:类
违约:
io.confluent.connect.hdfs.avro.AvroFormat
重要性:高默认情况下,这些类可用:
io.confluent.connect.hdfs.avro.AvroFormat
io.confluent.connect.hdfs.json.JsonFormat
io.confluent.connect.hdfs.parquet.ParquetFormatio.confluent.connect.hdfs.string.StringFormat
如果您不使用jsonformat,我相信为了从json输出avro,您需要这样一个json记录否则,无法从json记录推断avro模式。
通过您的一系列编辑,我认为您转向了生成avro,但是使用jsonconverter是基于我上面提到的,这不是我的建议。基本上,转换器类类型必须与生产者数据匹配,并定义使用者反序列化器
对于id-1的序列化错误,基本上是说键或值中的数据不是avro。现在,ksql不能与avro密钥一起工作,所以我敢打赌密钥反序列化程序失败了。要解决这个问题,设置