storm 1.0.1中的storm kafka客户端

cx6n0qe3  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(390)

基于storm文档支持的 KafkaSpout 基于旧的消费者api。我注意到外部包有另一个名为 storm-kafka-client .
https://github.com/apache/storm/tree/master/external/storm-kafka-client
目前还不清楚新的客户端是否在 1.0.1 生产准备就绪。有没有人有运行它的经验?

afdcj2ne

afdcj2ne1#

您可以为Storm1.1.0使用以下maven依赖项

<dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
         <version>1.1.0</version>
        <scope>provided</scope>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>log4j-over-slf4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

 <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.0.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka</artifactId>
    <version>1.0.0</version>
</dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
         <version>0.9.0.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

您可能会面临更多的依赖性问题,您可以通过添加所需的jar来解决这些问题。
java代码中的依赖关系也将从org.backtype.storm.xx更改为org.apache.storm.xx

kyks70gy

kyks70gy2#

我把同样的问题贴到了暴风邮件列表上。新的api已经可以生产了。我们应该使用1.x分支。我打算和你一起测试

<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client -->

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka-client</artifactId>
    <version>1.0.1</version>
</dependency>

将更新进度。

gpnt7bae

gpnt7bae3#

下面的代码对我来说很好!!!

public TopologyBuilder myTopology() {

    TopologyBuilder builder = new TopologyBuilder();

    try {       
        KafkaSpoutConfig<String, String> kafkaSpoutConfig = getKafkaSpoutConfig("KAFKA_IP:9092", KAFKA_TOPIC);          
        KafkaSpout kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
        builder.setSpout("kafkaSpout", kafkaSpout, 2 * 2);
        builder.setBolt("Bolt-1", new TestBolt(), parallelism).shuffleGrouping("kafkaSpout", KAFKA_TOPIC);
    } catch (Exception ex) {

    }

    return builder;
}

配置喷口。

protected KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers ,String topic) {
    ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
        (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
        new Fields("topic", "partition", "offset", "key", "value"), topic);

    Builder<String, String> builder = KafkaSpoutConfig.builder(bootstrapServers, new String[]{topic});

    return builder.setProp(ConsumerConfig.GROUP_ID_CONFIG, topic)
        .setProcessingGuarantee(ProcessingGuarantee.AT_LEAST_ONCE)
        .setRetry(getRetryService())
        .setRecordTranslator(trans)
        .setOffsetCommitPeriodMs(10_000)
        .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
        .setMaxUncommittedOffsets(1000)
        .build();
}

对于配置失败的消息retyr逻辑

protected KafkaSpoutRetryService getRetryService() {
    return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
        TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
 }

相关问题