Kafka s3接收器连接器在获取空数据时崩溃

n53p2ov0  于 2022-12-03  发布在  Apache
关注(0)|答案(2)|浏览(144)

在源连接器发送NULL值之前,我有一个工作的s3接收器连接器; s3连接器崩溃。从MS SQL数据库中删除记录时出现问题。源连接器将删除信息传送到s3连接器,s3连接器崩溃。我删除了s3连接器,然后使用不同的名称重新创建了s3连接器,但没有任何更改。

org.apache.kafka.connect.errors.ConnectException: Null valued records are not writeable with current behavior.on.null.values 'settings.
        at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:91)
        at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:502)
        at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:275)
        at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:220)
        at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:189)
        at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:190)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020-05-24 10:10:50,577 WARN WorkerSinkTask{id=minio-connector1-0} Ignoring invalid task provided offset filesql1.dbo.Files-0/OffsetAndMetadata{offset=16, leaderEpoch=null, metadata=''} -- not yet consumed, taskOffset=16 currentOffset=0 (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-minio-connector1-0]
2020-05-24 10:10:50,577 ERROR WorkerSinkTask{id=minio-connector1-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-minio-connector1-0]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Null valued records are not writeable with current behavior.on.null.values 'settings.
        at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:91)
        at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:502)
        at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:275)
        at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:220)
        at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:189)
        at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:190)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
        ... 10 more
2020-05-24 10:10:50,577 ERROR WorkerSinkTask{id=minio-connector1-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-minio-connector1-0]

...这是我的s3连接器配置:

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: "minio-connector1"
  labels:
    strimzi.io/cluster: mssql-minio-connect-cluster
spec:
  class: io.confluent.connect.s3.S3SinkConnector
  config:
    storage.class: io.confluent.connect.s3.storage.S3Storage  
    partitioner.class: io.confluent.connect.storage.partitioner.DefaultPartitioner
    tasks.max: '1'
    topics: filesql1.dbo.Files
    s3.bucket.name: dosyalar
    s3.part.size: '5242880'
    flush.size: '2'
    format: binary
    schema.compatibility: NONE
    max.request.size: "536870912"
    store.url: http://minio.dev-kik.io
    format.class: io.confluent.connect.s3.format.avro.AvroFormat
    key.converter: io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url: http://schema-registry-cp-schema-registry:8081
    value.converter: io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url: http://schema-registry-cp-schema-registry:8081
    internal.key.converter: org.apache.kafka.connect.json.JsonConverter
    internal.value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true
    value.converter.schemas.enable: true
    schema.generator.class: io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator

我有两个问题:
1)如何使s3连接器再次运行?
2)不能期望不从源数据库中删除记录。如何防止s3连接器再次崩溃?

rhfm7lfc

rhfm7lfc1#

请查看连接器文档并查找behavior.on.null.values。您可以将其设置为ignore

slmsl1lt

slmsl1lt2#

查看页面:
https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html#storage

应设置适当的连接器值以忽略它:

behavior.on.null.values=ignore

相关问题