SpringCloudKafka:当两个处理器处于活动状态时,无法序列化输出流的数据

zd287kbt  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(345)

我有一个springcloudkafka流函数式编程风格的工作设置。有两个用例,通过 application.properties . 它们都是单独工作的,但当我同时激活它们时,第二个用例的输出流就会出现序列化错误:

Exception in thread "ActivitiesAppId-05296224-5ea1-412a-aee4-1165870b5c75-StreamThread-1" org.apache.kafka.streams.errors.StreamsException:
Error encountered sending record to topic outputActivities for task 0_0 due to:
...
Caused by: org.apache.kafka.common.errors.SerializationException:
Can't serialize data [com.example.connector.model.Activity@497b37ff] for topic [outputActivities]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException:
Incompatible types: declared root type ([simple type, class com.example.connector.model.Material]) vs com.example.connector.model.Activity

这里的最后一行很重要,因为“声明的根类型”来自 Material 类,但不是 Activity 类,这可能是源错误。
同样,当我在启动应用程序之前只激活第二个用例时,一切都正常。因此,我假设“material”处理器以某种方式与“activities”处理器(或其序列化程序)相互干扰,但我不知道何时何地。
设置
1.)用例:“材料”
一个输入流->转换->一个输出流

@Bean
public Function<KStream<String, MaterialRaw>, KStream<String, Material>> processMaterials() {...}
``` `application.properties` ```
spring.cloud.stream.kafka.streams.binder.functions.processMaterials.applicationId=MaterialsAppId
spring.cloud.stream.bindings.processMaterials-in-0.destination=inputMaterialsRaw
spring.cloud.stream.bindings.processMaterials-out-0.destination=outputMaterials

2.)用例:“活动”
两个输入流->连接->一个输出流

@Bean
public BiFunction<KStream<String, ActivityRaw>, KStream<String, Assignee>, KStream<String, Activity>> processActivities() {...}
``` `application.properties` ```
spring.cloud.stream.kafka.streams.binder.functions.processActivities.applicationId=ActivitiesAppId
spring.cloud.stream.bindings.processActivities-in-0.destination=inputActivitiesRaw
spring.cloud.stream.bindings.processActivities-in-1.destination=inputAssignees
spring.cloud.stream.bindings.processActivities-out-0.destination=outputActivities

这两个处理器也被定义为流函数 application.properties : spring.cloud.stream.function.definition=processActivities;processMaterials 谢谢!
更新-下面是我如何使用代码中的处理器:
实施

// Material model
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class MaterialRaw {
    private String id;
    private String name;
}

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Material {
    private String id;
    private String name;
}

// Material processor
@Bean
public Function<KStream<String, MaterialRaw>, KStream<String, Material>> processMaterials() {
    return materialsRawStream -> materialsRawStream .map((recordKey, materialRaw) -> {
        // some transformation
        final var newId = materialRaw.getId() + "---foo";
        final var newName = materialRaw.getName() + "---bar";
        final var material = new Material(newId, newName);

        // output
        return new KeyValue<>(recordKey, material); 
    };
}
// Activity model
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class ActivityRaw {
    private String id;
    private String name;
}

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Assignee {
    private String id;
    private String assignedAt;
}

/**
 * Combination of `ActivityRaw` and `Assignee`
 */
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Activity {
    private String id;
    private Integer number;
    private String assignedAt;
}

// Activity processor
@Bean
public BiFunction<KStream<String, ActivityRaw>, KStream<String, Assignee>, KStream<String, Activity>> processActivities() {
    return (activitiesRawStream, assigneesStream) -> { 
        final var joinWindow = JoinWindows.of(Duration.ofDays(30));

        final var streamJoined = StreamJoined.with(
            Serdes.String(),
            new JsonSerde<>(ActivityRaw.class),
            new JsonSerde<>(Assignee.class)
        );

        final var joinedStream = activitiesRawStream.leftJoin(
            assigneesStream,
            new ActivityJoiner(),
            joinWindow,
            streamJoined
        );

        final var mappedStream = joinedStream.map((recordKey, activity) -> {
            return new KeyValue<>(recordKey, activity);
        });

        return mappedStream;
    };
}
wixjitnu

wixjitnu1#

您需要为每个函数指定要使用的绑定器 s.c.s.bindings..binder=... .
但是,如果没有这些,我会预料到一个错误,比如“找到了多个绑定,但没有指定默认值”,这就是消息通道绑定所发生的情况。

tez616oj

tez616oj2#

这是活页夹推断的一个问题 Serde 类型当有多个函数具有不同的出站目标类型时,其中一个具有 Activity 另一个是 Material 对你来说。我们必须在活页夹中说明这一点。我在这里制造了一个问题。
同时,您可以遵循此解决方法。
创建自定义 Serde 分类如下。

public class ActivitySerde extends JsonSerde<Activity> {}

然后,显式地使用 Serde 为您的 processActivities 使用配置的函数。
例如。,

spring.cloud.stream.kafka.streams.bindings.processActivities-out-0.producer.valueSerde=com.example.so65003575.ActivitySerde

如果您正在尝试此解决方法,请将程序包更改为相应的程序包。
这里是另一个推荐的方法。如果定义类型为的bean Serde 对于目标类型,它优先于绑定器对 KStream 类型。因此,您也可以这样做,而无需在上述解决方案中定义额外的类。

@Bean
public Serde<Activity> activitySerde() {
  return new JsonSerde(Activity.class);
}

以下是文件,其中解释了所有这些细节。

相关问题