当我在Flume中同时使用Kafka通道和KafkaFlume时,几分钟后我得到了这个异常
java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
at com.google.common.base.Preconditions.checkState(Preconditions.java:172)
at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:255)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
此异常在某些事件成功传递后引发。
如果我使用记忆通道而不是Kafka通道或loggersink而不是Kafka接收器一切都很好。即使我使用一个Kafka集群作为Kafka频道,另一个Kafka集群作为Kafka接收器,一切都很好。
flume版本为:1.7.0
Kafka版本是:0.10.2.0
1条答案
按热度按时间dbf7pr2w1#
我在一台机器上用了Flume和Kafka。
我把flume频道主题分区从4改为1,问题就解决了。