如何在Storm上制作同步Kafka喷口

q3aa0525  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(223)

我试图使Kafka消费者同步消费Kafka的信息。
我遇到的实际问题是消息队列被存储在Storm Spout中。
我想做的是让Storm等待Kafka的确认回复,然后才让Storm使用下一条消息。
我正在使用Storm KafkaSpout:

/**
     * Creates a configured kafka spout.
     * @param topicName Topic where the kafka spout subscribes
     * @return An instance of configured KafkaSpout
     */
    public KafkaSpout getkafkaSpout(String topicName){
        return new KafkaSpout(this.getSpoutConfig(topicName));
    }

    /**
     * Create the necessary configuration to create a new kafka spout.
     * @param topicName Topic where the kafka spout subscribes
     * @return Spout configuration
     */
    public SpoutConfig getSpoutConfig(String topicName) {
        SpoutConfig spoutConfig=new SpoutConfig(this.getZkHosts(),topicName, "", String.join("-",topicName,RandomStringUtils.randomAlphanumeric(20)));
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        spoutConfig.startOffsetTime=kafka.api.OffsetRequest.LatestTime();
        return spoutConfig;
    }


builder.setSpout("kafkaPackedData", stormConfig.getkafkaSpout("topic_kafka"), 2);

我已经更新到Storm 2.0.0,我使用storm-kafka-client。但是如果我将Storm队列配置为50:setMaxSpoutPending(50);当我向Kafka发送许多数据时,它会停止使用。
我已经为Kafka consumer配置了下一个配置:

KafkaSpoutConfig spoutConf =  KafkaSpoutConfig.builder("stream1:9092", "kafkaToStormAlarms")
                    .setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000") //Set session timeout
                    .setProp(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000") //Set request timeout
                    .setOffsetCommitPeriodMs(10000)    //Set automatic confirmation time (in ms)
                    .setFirstPollOffsetStrategy(LATEST)    //Set to pull the latest messages
                    .setRetry(kafkaSpoutRetryService)
                    .build();

当Storm使用了50条与MaxSpoutPending配置相同的消息时,它将停止使用更多的消息。也许下一个bolt没有正确发送ACK?我在KafkaConsumerSpout之后使用下一个bolt:

public class testBolt extends BaseBasicBolt {

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("MQTTmessage"));
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector boc) {
        System.out.println("\n\n\n\nLLEGA BIENN AL SPLIT TEXT BOLT\n\n");
        System.out.println("TUPLE "+tuple);
        String text = tuple.getString(4);
        List<String> lines = Arrays.asList(text.split("\\r?\\n"));

        lines.forEach(line -> {
            boc.emit(new Values(line));
        });
    }
}
xtfmy6hx

xtfmy6hx1#

关于喷口节流:是的,您可以通过将拓扑配置中的topology.max.spout.pending选项设置为1来实现这一点。如果您希望获得良好的吞吐量,我并不真正建议您这样做,但我假设您已经仔细考虑了为什么需要拓扑以这种方式工作。
关于新喷嘴:Kafka运行的服务器是stream1:9092吗?您要发送的主题是kafkaToStormAlarms吗?如果不是,那可能是您的问题。否则,检查storm/logs/workers-artifacts中的worker日志,它可能会告诉您为什么spout不发送任何内容。
最后是的,你绝对应该使用storm-kafka-client而不是storm-kafka,否则你将无法升级到Storm 2.0.0,或者最新的Kafka版本。

相关问题