我正在使用Quarkus开发一个流应用程序。我的应用程序如下所示。
flatMap
以更改密钥并从单个消息生成多个消息。
1.使用步骤1中的密钥将join
与KTable
进行匹配。transform
表示有状态操作。flatMap
将密钥更改回原始密钥,即步骤1之前的密钥。groupBy
与步骤4中的密钥相同,实际上是步骤1中的密钥。reduce
将记录“合并”到包含JSON数组的单个消息中。
实际效果是拆分传入的消息(将key
与id1
合并为多条消息)(使用不同的密钥,例如k1
、k2
等)。使用join
和transform
增强每条消息。然后,将每条消息的密钥改回id1
。最后,将每个增强消息“合并”成密钥为id1
的单个消息。
我在设置默认键serde和值serde时总是遇到错误。虽然默认的serde可以在application.properties
中设置,但我不清楚为什么会出现这种错误?
请注意,如果我不执行步骤5和步骤6,应用程序将成功运行。
这是我得到的Java异常。
2022-10-17 16:42:34,884 ERROR [org.apa.kaf.str.KafkaStreams] (app-alerts-6a7c4df8-7813-4d5d-9a86-d6f3db7c8ef0-StreamThread-1) stream-client [app-alerts-6a7c4df8-7813-4d5d-9a86-d6f3db7c8ef0] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. : org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551)
Caused by: org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
at org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1587)
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:90)
at org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:195)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:144)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:212)
at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:97)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:231)
at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:454)
at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:865)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:747)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589)
... 1 more
这些是StreamsConfig
值:
acceptable.recovery.lag = 10000
application.id = machine-alerts
application.server =
bootstrap.servers = [kafka:9092]
buffered.records.per.partition = 1000
built.in.metrics.version = latest
cache.max.bytes.buffering = 10240
client.id =
commit.interval.ms = 1000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.dsl.store = rocksDB
default.key.serde = null
default.list.key.serde.inner = null
default.list.key.serde.type = null
default.list.value.serde.inner = null
default.list.value.serde.type = null
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = null
max.task.idle.ms = 0
max.warmup.replicas = 2
metadata.max.age.ms = 500
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = DEBUG
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
poll.ms = 100
probing.rebalance.interval.ms = 600000
processing.guarantee = at_least_once
rack.aware.assignment.tags = []
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
repartition.purge.interval.ms = 30000
replication.factor = -1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
task.timeout.ms = 300000
topology.optimization = none
upgrade.from = null
window.size.ms = null
windowed.inner.class.serde = null
windowstore.changelog.additional.retention.ms = 86400000
1条答案
按热度按时间m1m5dgzv1#
我不知道Quarkus是如何工作的,但是,当你开始使用Kafka Streams时,groupBy语句中的Serde错误是很常见的。groupBy语句创建了一个内部的压缩主题,其中的数据将按键分组,(在内部,您的流应用程序将向该主题发送消息)因此,如果值或键的类型与Stream属性中的默认键和值Serde不同,则应该在代码级别为GroupBy语句指定Serde。
在上面的示例中,我在group语句中使用了所需的Serdes,而不是在属性级别使用。