java Flink Kafka消费者:无法获取主题的元数据

velaa5lx  于 2023-03-28  发布在  Java
关注(0)|答案(1)|浏览(911)

我有一个flink代码,它从Kafka获取数据并打印到控制台。
在一个只有flink代码的简单代码中,它正确地打印数据。
当我将代码转换为类结构时,它会停止代码并开始给出下面的消息

[SourceCoordinator-Source: Alert Router] ERROR org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext - Exception while handling result from async call in SourceCoordinator-Source: Alert Router. Triggering job failover.
org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due to 
    at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:234)
    at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83)
    at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: Failed to get metadata for topics [alerts_consumer_group_prod].
    at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47)
    at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)
    at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219)
    at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
    at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
    ... 9 more
Caused by: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.

服务器托管主题和分区(已检查多次)
我的代码:

配置属性

bootstrapServers:localhost:9092,localhost:9095,localhost:9097,localhost:9099
allAlertsConsumerGroupID:alerts_consumer_group_prod
allAlertsTopic:<some topic>
allAlertsTopicParallelism:2

主类

package allEvents;

import java.io.FileInputStream;
import java.util.Properties;

public class AllEvents {
    public static void main(String[] args) throws Exception {
        String propertiesFile = "./src/main/resources/config.properties";
        Properties properties = new Properties();
        FileInputStream fileInputStream = new FileInputStream(propertiesFile);
        properties.load(fileInputStream);

        String bootstrapServer = properties.getProperty("bootstrapServers");
        String alertTopic = properties.getProperty("allAlertsTopic");
        String alertConsumerGroupID = properties.getProperty("allAlertsConsumerGroupID");
        int alertTopicParallelism = Integer.parseInt(properties.getProperty("allAlertsTopicParallelism"));
        System.out.println("Starting: " + bootstrapServer + ", Topic: " + alertTopic + ", ConsumerGroup: " + alertConsumerGroupID + ", Parallelism: " + alertTopicParallelism);

        AlertRouter alertRouter = new AlertRouter(bootstrapServer, alertTopic, alertConsumerGroupID, alertTopicParallelism);
        alertRouter.routeAlerts();
    }
}

告警路由器

package allEvents;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import javax.xml.crypto.Data;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Properties;

public class AlertRouter {
    private String bootstrapServer;
    private String consumerGroupID;
    private String topic;
    private int parallelism;

    public AlertRouter(String bootstrapServer, String consumerGroupID, String topic, int parallelism) {
        this.bootstrapServer = bootstrapServer;
        this.consumerGroupID = consumerGroupID;
        this.topic = topic;
        this.parallelism = parallelism;
    }

    public void routeAlerts() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(this.parallelism);

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServer);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.consumerGroupID);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

        System.out.println("Inside route Alerts: " + properties);

        String propertiesFile = "./src/main/resources/config.properties";
        Properties fileProps = new Properties();
        FileInputStream fileInputStream = new FileInputStream(propertiesFile);
        fileProps.load(fileInputStream);

        KafkaSource<String> alertSourceKafka = KafkaSource.<String>builder()
                .setBootstrapServers(this.bootstrapServer)
                .setTopics(this.topic)
                .setProperties(properties)
                .setGroupId(this.consumerGroupID)
                .setStartingOffsets(OffsetsInitializer.latest())
           .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                .build();

        System.out.println("Kafka Source: " + this.bootstrapServer);

        DataStream<String> alertsStream = env.fromSource(alertSourceKafka, WatermarkStrategy.noWatermarks(), "Alert Router");

        alertsStream.print("Alert Stream");
        env.execute();
    }
}

有人能帮我理解一下这个问题吗?

w8ntj3qf

w8ntj3qf1#

请注意构造函数的顺序

public AlertRouter(String bootstrapServer, String consumerGroupID, String topic

还有召唤

new AlertRouter(bootstrapServer, alertTopic, alertConsumerGroupID

仔细阅读日志,您会发现您的消费者组名称并不作为主题存在

相关问题