基于storm文档支持的 KafkaSpout 基于旧的消费者api。我注意到外部包有另一个名为 storm-kafka-client .https://github.com/apache/storm/tree/master/external/storm-kafka-client目前还不清楚新的客户端是否在 1.0.1 生产准备就绪。有没有人有运行它的经验?
KafkaSpout
storm-kafka-client
1.0.1
afdcj2ne1#
您可以为Storm1.1.0使用以下maven依赖项
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.0</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.9.0.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency>
您可能会面临更多的依赖性问题,您可以通过添加所需的jar来解决这些问题。java代码中的依赖关系也将从org.backtype.storm.xx更改为org.apache.storm.xx
kyks70gy2#
我把同样的问题贴到了暴风邮件列表上。新的api已经可以生产了。我们应该使用1.x分支。我打算和你一起测试
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>1.0.1</version> </dependency>
将更新进度。
gpnt7bae3#
下面的代码对我来说很好!!!
public TopologyBuilder myTopology() { TopologyBuilder builder = new TopologyBuilder(); try { KafkaSpoutConfig<String, String> kafkaSpoutConfig = getKafkaSpoutConfig("KAFKA_IP:9092", KAFKA_TOPIC); KafkaSpout kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig); builder.setSpout("kafkaSpout", kafkaSpout, 2 * 2); builder.setBolt("Bolt-1", new TestBolt(), parallelism).shuffleGrouping("kafkaSpout", KAFKA_TOPIC); } catch (Exception ex) { } return builder; }
配置喷口。
protected KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers ,String topic) { ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>( (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()), new Fields("topic", "partition", "offset", "key", "value"), topic); Builder<String, String> builder = KafkaSpoutConfig.builder(bootstrapServers, new String[]{topic}); return builder.setProp(ConsumerConfig.GROUP_ID_CONFIG, topic) .setProcessingGuarantee(ProcessingGuarantee.AT_LEAST_ONCE) .setRetry(getRetryService()) .setRecordTranslator(trans) .setOffsetCommitPeriodMs(10_000) .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST) .setMaxUncommittedOffsets(1000) .build(); }
对于配置失败的消息retyr逻辑
protected KafkaSpoutRetryService getRetryService() { return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500), TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10)); }
3条答案
按热度按时间afdcj2ne1#
您可以为Storm1.1.0使用以下maven依赖项
您可能会面临更多的依赖性问题,您可以通过添加所需的jar来解决这些问题。
java代码中的依赖关系也将从org.backtype.storm.xx更改为org.apache.storm.xx
kyks70gy2#
我把同样的问题贴到了暴风邮件列表上。新的api已经可以生产了。我们应该使用1.x分支。我打算和你一起测试
将更新进度。
gpnt7bae3#
下面的代码对我来说很好!!!
配置喷口。
对于配置失败的消息retyr逻辑