java 如何在Spring Integration Kafka Message中设置ID头?

ozxc1zmp  于 2023-06-04  发布在  Java
关注(0)|答案(1)|浏览(241)

我有一个演示Spring Integration项目,它接收Kafka消息,聚合它们,然后发布它们。我正在尝试将JdbcMessageStore添加到项目中。问题是它失败了,错误是:

Caused by: java.lang.IllegalArgumentException: Cannot store messages without an ID header
    at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.2.15.RELEASE.jar:5.2.15.RELEASE]
    at org.springframework.integration.jdbc.store.JdbcMessageStore.addMessage(JdbcMessageStore.java:314) ~[spring-integration-jdbc-5.3.8.RELEASE.jar:5.3.8.RELEASE]

调试后,我发现它需要此消息中的UUID头id。但问题是我不能手动设置Kafka头ID -它是禁止的(与timestamp头相同)-我试图在不同项目的Kafka producer中这样做。
如果我使用名为Big Data Tools的IDEA插件并从那里发送消息,我可以设置id头,但它被我的项目作为字节数组接收,并且失败并出错

IllegalArgumentException Incorrect type specified for header 'id'. Expected [UUID] but actual type is [B]

我找不到解决这个问题的方法。我需要以某种方式设置这个id头,以便能够在数据库中存储消息。先谢谢你了

wko9yo5t

wko9yo5t1#

KafkaMessageDrivenChannelAdapter有一个选项:

/**
 * Set the message converter to use with a record-based consumer.
 * @param messageConverter the converter.
 */
public void setRecordMessageConverter(RecordMessageConverter messageConverter) {

在这里,您可以使用以下命令设置MessagingMessageConverter

/**
 * Generate {@link Message} {@code ids} for produced messages. If set to {@code false},
 * will try to use a default value. By default set to {@code false}.
 * @param generateMessageId true if a message id should be generated
 */
public void setGenerateMessageId(boolean generateMessageId) {
    this.generateMessageId = generateMessageId;
}

/**
 * Generate {@code timestamp} for produced messages. If set to {@code false}, -1 is
 * used instead. By default set to {@code false}.
 * @param generateTimestamp true if a timestamp should be generated
 */
public void setGenerateTimestamp(boolean generateTimestamp) {
    this.generateTimestamp = generateTimestamp;
}

设置为true
这样,从ConsumerRecord创建的Message将具有相应的idtimestamp头。
您也可以简单地使用一个“虚拟”转换器来返回传入的有效负载,框架将创建一个新的Message,并在其中生成这些头文件。

相关问题