enablebinding、output、input自3.1版spring cloud stream以来已弃用

jdg4fx2g  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(612)

由于版本3.1,不推荐使用用于处理队列的主要api。在课堂评论中说:
从3.1开始就不推荐使用函数式编程模型
我在网上搜索了很多解决方案,但没有找到一个关于如何迁移的可靠的e2e解释。
寻找以下示例:
从队列读取
写入队列
如果有一些方法可以做到这一点(正如我在web上看到的),我很乐意为每个选项提供一个解释和典型的用例。

nhhxz33t

nhhxz33t1#

我假设您已经熟悉了主要概念,并将重点介绍迁移。
我使用kotlin作为演示代码,以减少冗长
首先,一些参考资料可能会有所帮助:
以下是初始的相关doc:link
这是对新函数格式link中命名方案的解释
这是一些更高级的场景的更详细的解释:link
热释光;博士
spring不再使用基于注解的配置,而是使用检测到的 Consumer / Function / Supplier 为您定义流。
输入/消费者
而在你有这样的代码之前:

interface BindableGradesChannel {
    @Input
    fun gradesChannel(): SubscribableChannel

    companion object {
        const val INPUT = "gradesChannel"
    }
}

用法类似于:

@Service
@EnableBinding(BindableGradesChannel::class)
class GradesListener {
    private val log = LoggerFactory.getLogger(GradesListener::class.java)

    @StreamListener(BindableScoresChannel.INPUT)
    fun listen(grade: Grade) {
        log.info("Received $grade")
        // do something
    }
}

现在整个定义都无关紧要,可以这样做:

@Service
class GradesListener {
    private val log = LoggerFactory.getLogger(GradesListener::class.java)

    @Bean
    fun gradesChannel(): Consumer<Grade> {
        return Consumer { listen(grade = it) }
    }

    fun listen(grade: Grade) {
        log.info("Received $grade")
        // do something
    }
}

注意 Consumer 比恩取代了 @StreamListener 以及 @Input .
关于配置,如果之前为了配置你有一个 application.yml 看起来是这样的:

spring:
  cloud:
    stream:
      bindings:
        gradesChannel:
          destination: GradesExchange
          group: grades-updates
          consumer:
            concurrency: 10
            max-attempts: 3

现在应该是这样:

spring:
  cloud:
    stream:
      bindings:
        gradesChannel-in-0:
          destination: GradesExchange
          group: grades-updates
          consumer:
            concurrency: 10
            max-attempts: 3

注意怎么做 gradesChannel 被替换为 gradesChannel-in-0 -要了解完整的命名约定,请参阅顶部的命名约定链接。
一些细节:
如果应用程序中有多个这样的bean,则需要定义 spring.cloud.function.definition 财产。
您可以选择为频道指定自定义名称,因此如果您想继续使用 gradesChannel 你可以设置 spring.cloud.stream.function.bindings.gradesChannel-in-0=gradesChannel 并在配置中随处使用 gradesChannel .
输出/供应商
这里的概念是相似的,您替换config,代码如下所示:

interface BindableStudentsChannel {
    @Output
    fun studentsChannel(): MessageChannel
}

@Service
@EnableBinding(BindableStudentsChannel::class)
class StudentsQueueWriter(private val studentsChannel: BindableStudentsChannel) {
    fun publish(message: Message<Student>) {
        studentsChannel.studentsChannel().send(message)
    }
}

现在可以替换为:

@Service
class StudentsQueueWriter {
    @Bean
    fun studentsChannel(): Supplier<Student> {
        return Supplier { Student("Adam") }
    }
}

正如你所看到的,我们有一个很大的区别-什么时候叫,谁叫?
以前我们可以手动触发它,但现在它是由spring每秒(默认情况下)触发的。这对于需要每秒发布一个传感器数据的用例来说是很好的,但是当您想要发送一个事件的消息时就不好了。除了使用 Function 无论出于何种原因,spring提供了两种选择:
streambridge-链接
使用 StreamBridge 你可以。明确定义目标,如下所示:

@Service
class StudentsQueueWriter(private val streamBridge: StreamBridge) {
    fun publish(message: Message<Student>) {
        streamBridge.send("studentsChannel-out-0", message)
    }
}

这样,您就不会将目标通道定义为bean,但仍然可以发送消息。缺点是类中有一些显式配置。
React堆api-链接
另一种方法是使用某种React机制,例如 EmitterProcessor ,并归还。使用此选项,您的代码将类似于:

@Service
class StudentsQueueWriter {
    val students: EmitterProcessor<Student> = EmitterProcessor.create()
    @Bean
    fun studentsChannel(): Supplier<Flux<Student>> {
        return Supplier { students }
    }
}

用法可能类似于:

class MyClass(val studentsQueueWriter: StudentsQueueWriter) {
    fun newStudent() {
        studentsQueueWriter.students.onNext(Student("Adam"))
    }
}

相关问题