我有一个flink代码,它从Kafka获取数据并打印到控制台。
在一个只有flink代码的简单代码中,它正确地打印数据。
当我将代码转换为类结构时,它会停止代码并开始给出下面的消息
[SourceCoordinator-Source: Alert Router] ERROR org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext - Exception while handling result from async call in SourceCoordinator-Source: Alert Router. Triggering job failover.
org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due to
at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:234)
at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83)
at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: Failed to get metadata for topics [alerts_consumer_group_prod].
at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47)
at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)
at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219)
at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
... 6 more
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
... 9 more
Caused by: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
服务器托管主题和分区(已检查多次)
我的代码:
配置属性
bootstrapServers:localhost:9092,localhost:9095,localhost:9097,localhost:9099
allAlertsConsumerGroupID:alerts_consumer_group_prod
allAlertsTopic:<some topic>
allAlertsTopicParallelism:2
主类
package allEvents;
import java.io.FileInputStream;
import java.util.Properties;
public class AllEvents {
public static void main(String[] args) throws Exception {
String propertiesFile = "./src/main/resources/config.properties";
Properties properties = new Properties();
FileInputStream fileInputStream = new FileInputStream(propertiesFile);
properties.load(fileInputStream);
String bootstrapServer = properties.getProperty("bootstrapServers");
String alertTopic = properties.getProperty("allAlertsTopic");
String alertConsumerGroupID = properties.getProperty("allAlertsConsumerGroupID");
int alertTopicParallelism = Integer.parseInt(properties.getProperty("allAlertsTopicParallelism"));
System.out.println("Starting: " + bootstrapServer + ", Topic: " + alertTopic + ", ConsumerGroup: " + alertConsumerGroupID + ", Parallelism: " + alertTopicParallelism);
AlertRouter alertRouter = new AlertRouter(bootstrapServer, alertTopic, alertConsumerGroupID, alertTopicParallelism);
alertRouter.routeAlerts();
}
}
告警路由器
package allEvents;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import javax.xml.crypto.Data;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
public class AlertRouter {
private String bootstrapServer;
private String consumerGroupID;
private String topic;
private int parallelism;
public AlertRouter(String bootstrapServer, String consumerGroupID, String topic, int parallelism) {
this.bootstrapServer = bootstrapServer;
this.consumerGroupID = consumerGroupID;
this.topic = topic;
this.parallelism = parallelism;
}
public void routeAlerts() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(this.parallelism);
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServer);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.consumerGroupID);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
System.out.println("Inside route Alerts: " + properties);
String propertiesFile = "./src/main/resources/config.properties";
Properties fileProps = new Properties();
FileInputStream fileInputStream = new FileInputStream(propertiesFile);
fileProps.load(fileInputStream);
KafkaSource<String> alertSourceKafka = KafkaSource.<String>builder()
.setBootstrapServers(this.bootstrapServer)
.setTopics(this.topic)
.setProperties(properties)
.setGroupId(this.consumerGroupID)
.setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
System.out.println("Kafka Source: " + this.bootstrapServer);
DataStream<String> alertsStream = env.fromSource(alertSourceKafka, WatermarkStrategy.noWatermarks(), "Alert Router");
alertsStream.print("Alert Stream");
env.execute();
}
}
有人能帮我理解一下这个问题吗?
1条答案
按热度按时间w8ntj3qf1#
请注意构造函数的顺序
还有召唤
仔细阅读日志,您会发现您的消费者组名称并不作为主题存在