java—从ApacheCamel中的特定偏移量开始读取kafka主题

9jyewag0  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(381)

我阅读了所有关于camel-kafka的文档,我读到的唯一方法是git和指定

public void configure() throws Exception {
                    from("kafka:" + TOPIC
                                 + "?groupId=A"
                                 + "&autoOffsetReset=earliest"             // Ask to start from the beginning if we have unknown offset
                                 + "&consumersCount=2"                     // We have 2 partitions, we want 1 consumer per partition
                                 + "&offsetRepository=#offset")            // Keep the offset in our repository
                            .to("mock:result");

}

但是对于客户机的订单,我需要使用spring,所以我对kafka的端点是

<!--DEFINE KAFKA'S TOPCIS AS ENDPOINT-->
        <endpoint id="tagBlink" uri="kafka:10.0.0.165:9092">
            <property key="topic" value="tagBlink"/>
            <property key="brokers" value="10.0.0.165:9092"/>
            <property key="offsetRepository" value="100"/>
        </endpoint>

但有个例外
找不到合适的setter属性:offsetrepository,因为没有具有相同类型的setter方法:java.lang.string,也不可能进行类型转换:没有类型转换器可用于从类型:java.lang.string转换为值为100的所需类型:org.apache.camel.spi.staterepository
我现在的配置可以吗?如何从特定偏移量恢复?

z31licg0

z31licg01#

在这段时间之后,我设法解决了这个问题。为此,我遵循了springbean的创建过程,并检查了 FileStateRepository 我需要一个文件,所以我创建了一个文件bean并添加为构造函数arg。之后我加了一个 init-method="doStart" . 此方法加载一个文件(如果存在),如果不存在,它将创建该文件。

<endpoint id="event" uri="kafka:localhost:9092">
        <property key="topic" value="eventTopic4"/>
        <property key="brokers" value="localhost:9092"/>
        <property key="autoOffsetReset" value="earliest"/>
        <property key="offsetRepository" value="#myRepo2"/>
    </endpoint>

    <bean id="myFileOfMyRepo" class="java.io.File">
        <constructor-arg type="java.lang.String" value="C:\repoDat\repo.dat"/>
    </bean>

    <bean id="myRepo2" class="org.apache.camel.impl.FileStateRepository " factory-method="fileStateRepository" init-method="doStart">
        <constructor-arg ref="myFileOfMyRepo"/>
    </bean>

在这之后,我在git中看到了camel的kafkaconsumer的代码。

offsetRepository.getState(serializeOffsetKey(topicPartition));
    if (offsetState != null && !offsetState.isEmpty()) {
        // The state contains the last read offset so you need to seek from the next one
        long offset = deserializeOffsetValue(offsetState) + 1;
        log.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset);
        consumer.seek(topicPartition, offset);
    }

用这个我设法从最后一个偏移量读出来。我希望 Camel 文档为Kafka添加这些额外的步骤。

d5vmydt9

d5vmydt92#

重要的词是“repository”而不是“offset”:它不是一个整数值,但它是对bean的引用,用于指定偏移的持久化位置。
(非Spring)示例

// Create the repository in which the Kafka offsets will be persisted
FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat"));

// Bind this repository into the Camel registry
JndiRegistry registry = new JndiRegistry();
registry.bind("offsetRepo", repository);

// Configure the camel context
DefaultCamelContext camelContext = new DefaultCamelContext(registry);
camelContext.addRoutes(new RouteBuilder() {
    @Override
    public void configure() throws Exception {
        from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
                     "&groupId=A" +                            //
                     "&autoOffsetReset=earliest" +             // Ask to start from the beginning if we have unknown offset
                     "&offsetRepository=#offsetRepo")          // Keep the offsets in the previously configured repository
                .to("mock:result");
    }
});

相关问题