同时使用Kafka通道和KafkaFlume时出现Flume异常

lx0bsm1f  于 2021-06-04  发布在  Flume
关注(0)|答案(1)|浏览(440)

当我在Flume中同时使用Kafka通道和KafkaFlume时,几分钟后我得到了这个异常

java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
    at com.google.common.base.Preconditions.checkState(Preconditions.java:172)
    at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
    at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:255)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
    at java.lang.Thread.run(Thread.java:748)

此异常在某些事件成功传递后引发。
如果我使用记忆通道而不是Kafka通道或loggersink而不是Kafka接收器一切都很好。即使我使用一个Kafka集群作为Kafka频道,另一个Kafka集群作为Kafka接收器,一切都很好。
flume版本为:1.7.0
Kafka版本是:0.10.2.0

dbf7pr2w

dbf7pr2w1#

我在一台机器上用了Flume和Kafka。
我把flume频道主题分区从4改为1,问题就解决了。

相关问题