我已经建立了一个streams应用程序,它使用来自一个主题的消息,转换它们并将其放到另一个主题,如果序列化过程中发生任何错误,它会将记录放到错误主题。信息量巨大(以百万计)。stream应用程序运行得非常好,直到几天之后,我们加载了大约7000万个数据,它仍然运行得很好,然后前天我们在同一个应用程序中添加了另一个流,并开始流式传输数据,现在应用程序崩溃,出现了异常。每个流都分配了不同的主题和消费者组。应用程序运行了一个小时左右,然后由于“java.lang.outofmemoryerror:java heap space”错误而崩溃。这个应用程序的行为非常奇怪,我们将每个节点上的堆大小(xmx)增加到2g,我们的拓扑结构是2个节点正在运行该应用程序,该应用程序连接到kafka broker,kafka broker正在运行3个节点。没有网络问题,但我经常看到“尝试心跳失败,因为组正在重新平衡”和仅在新添加流的日志中发生的使用者重新平衡。
Kafka客户端版本-2.3.1Kafka代理-2.11
Kafka流配置:
```props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, SendAndContinueExceptionHandler.class);
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomProductionExceptionHandler.class);
props.put(ProducerConfig.RETRIES_CONFIG, "1");
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Streams creation code:
```@Bean
public Set<KafkaStreams> kStreamJson(StreamsBuilder builder) {
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
final KStream<String, JsonNode> infoStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), jsonSerde));
Properties infoProps = kStreamsConfigs().asProperties();
infoProps.put(StreamsConfig.APPLICATION_ID_CONFIG, migrationMOIProfilesGroupId);
infoStream
.map(IProcessX::process)
.through(
outputTopic,
Produced.with(Serdes.String(), new JsonPOJOSerde<>(Message.class)));
return Sets.newHashSet(
new KafkaStreams(builder.build(), infoProps)
);
}
收到的错误:
[8/6/20, 22:22:54:070 GST] 00000076 SystemOut O 2020-08-06 22:22:54.070 INFO 83225 --- [s-streams-group] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=streams-group-5c15c2c1-798f-4b4d-91a8-24bd9a093fe6-StreamThread-18-consumer, groupId=streams-group] Discovered group coordinator kafka.broker:9092 (id: 2147483644 rack: null)
[8/6/20, 22:23:22:831 GST] 00000d95 SystemOut O 2020-08-06 21:27:59.979 ERROR 83225 --- [| producer-3356] o.apache.kafka.common.utils.KafkaThread : Uncaught exception in thread 'kafka-producer-network-thread | producer-3356':
java.lang.OutOfMemoryError: Java heap space
我检查了session.timeout.ms、heartbeat.timeout.ms、max.poll.interval.ms、max.poll.records,但不确定要为它们设置什么值。
请帮我解决这个问题。
暂无答案!
目前还没有任何答案,快来回答吧!