apache-kafka Kafka -为什么groupBy和reduce需要键和值的默认serde?

0tdrvxhp  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(156)

我正在使用Quarkus开发一个流应用程序。我的应用程序如下所示。

  1. flatMap以更改密钥并从单个消息生成多个消息。
    1.使用步骤1中的密钥将joinKTable进行匹配。
  2. transform表示有状态操作。
  3. flatMap将密钥更改回原始密钥,即步骤1之前的密钥。
  4. groupBy与步骤4中的密钥相同,实际上是步骤1中的密钥。
  5. reduce将记录“合并”到包含JSON数组的单个消息中。
    实际效果是拆分传入的消息(将keyid1合并为多条消息)(使用不同的密钥,例如k1k2等)。使用jointransform增强每条消息。然后,将每条消息的密钥改回id1。最后,将每个增强消息“合并”成密钥为id1的单个消息。
    我在设置默认键serde和值serde时总是遇到错误。虽然默认的serde可以在application.properties中设置,但我不清楚为什么会出现这种错误?
    请注意,如果我不执行步骤5和步骤6,应用程序将成功运行。
    这是我得到的Java异常。
2022-10-17 16:42:34,884 ERROR [org.apa.kaf.str.KafkaStreams] (app-alerts-6a7c4df8-7813-4d5d-9a86-d6f3db7c8ef0-StreamThread-1) stream-client [app-alerts-6a7c4df8-7813-4d5d-9a86-d6f3db7c8ef0] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. : org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551)
Caused by: org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
        at org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1587)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:90)
        at org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
        at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
        at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:195)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:144)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:212)
        at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:97)
        at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:231)
        at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:454)
        at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:865)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:747)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589)
        ... 1 more

这些是StreamsConfig值:

acceptable.recovery.lag = 10000
        application.id = machine-alerts
        application.server = 
        bootstrap.servers = [kafka:9092]
        buffered.records.per.partition = 1000
        built.in.metrics.version = latest
        cache.max.bytes.buffering = 10240
        client.id = 
        commit.interval.ms = 1000
        connections.max.idle.ms = 540000
        default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
        default.dsl.store = rocksDB
        default.key.serde = null
        default.list.key.serde.inner = null
        default.list.key.serde.type = null
        default.list.value.serde.inner = null
        default.list.value.serde.type = null
        default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
        default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
        default.value.serde = null
        max.task.idle.ms = 0
        max.warmup.replicas = 2
        metadata.max.age.ms = 500
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = DEBUG
        metrics.sample.window.ms = 30000
        num.standby.replicas = 0
        num.stream.threads = 1
        poll.ms = 100
        probing.rebalance.interval.ms = 600000
        processing.guarantee = at_least_once
        rack.aware.assignment.tags = []
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        repartition.purge.interval.ms = 30000
        replication.factor = -1
        request.timeout.ms = 40000
        retries = 0
        retry.backoff.ms = 100
        rocksdb.config.setter = null
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        state.cleanup.delay.ms = 600000
        state.dir = /tmp/kafka-streams
        task.timeout.ms = 300000
        topology.optimization = none
        upgrade.from = null
        window.size.ms = null
        windowed.inner.class.serde = null
        windowstore.changelog.additional.retention.ms = 86400000
m1m5dgzv

m1m5dgzv1#

我不知道Quarkus是如何工作的,但是,当你开始使用Kafka Streams时,groupBy语句中的Serde错误是很常见的。groupBy语句创建了一个内部的压缩主题,其中的数据将按键分组,(在内部,您的流应用程序将向该主题发送消息)因此,如果值或键的类型与Stream属性中的默认键和值Serde不同,则应该在代码级别为GroupBy语句指定Serde。

KGroupedStream<String, User> groupedStream = stream.groupByKey(
    Serialized.with(
      Serdes.String(), /* key */
      new CustomUserSerde())     /* value */
  );

在上面的示例中,我在group语句中使用了所需的Serdes,而不是在属性级别使用。

相关问题