我阅读了所有关于camel-kafka的文档,我读到的唯一方法是git和指定
public void configure() throws Exception {
from("kafka:" + TOPIC
+ "?groupId=A"
+ "&autoOffsetReset=earliest" // Ask to start from the beginning if we have unknown offset
+ "&consumersCount=2" // We have 2 partitions, we want 1 consumer per partition
+ "&offsetRepository=#offset") // Keep the offset in our repository
.to("mock:result");
}
但是对于客户机的订单,我需要使用spring,所以我对kafka的端点是
<!--DEFINE KAFKA'S TOPCIS AS ENDPOINT-->
<endpoint id="tagBlink" uri="kafka:10.0.0.165:9092">
<property key="topic" value="tagBlink"/>
<property key="brokers" value="10.0.0.165:9092"/>
<property key="offsetRepository" value="100"/>
</endpoint>
但有个例外
找不到合适的setter属性:offsetrepository,因为没有具有相同类型的setter方法:java.lang.string,也不可能进行类型转换:没有类型转换器可用于从类型:java.lang.string转换为值为100的所需类型:org.apache.camel.spi.staterepository
我现在的配置可以吗?如何从特定偏移量恢复?
2条答案
按热度按时间z31licg01#
在这段时间之后,我设法解决了这个问题。为此,我遵循了springbean的创建过程,并检查了
FileStateRepository
我需要一个文件,所以我创建了一个文件bean并添加为构造函数arg。之后我加了一个init-method="doStart"
. 此方法加载一个文件(如果存在),如果不存在,它将创建该文件。在这之后,我在git中看到了camel的kafkaconsumer的代码。
用这个我设法从最后一个偏移量读出来。我希望 Camel 文档为Kafka添加这些额外的步骤。
d5vmydt92#
重要的词是“repository”而不是“offset”:它不是一个整数值,但它是对bean的引用,用于指定偏移的持久化位置。
(非Spring)示例