enablebinding、output、input自3.1版spring cloud stream以来已弃用

jdg4fx2g  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(680)

由于版本3.1,不推荐使用用于处理队列的主要api。在课堂评论中说:
从3.1开始就不推荐使用函数式编程模型
我在网上搜索了很多解决方案,但没有找到一个关于如何迁移的可靠的e2e解释。
寻找以下示例:
从队列读取
写入队列
如果有一些方法可以做到这一点(正如我在web上看到的),我很乐意为每个选项提供一个解释和典型的用例。

nhhxz33t

nhhxz33t1#

我假设您已经熟悉了主要概念,并将重点介绍迁移。
我使用kotlin作为演示代码,以减少冗长
首先,一些参考资料可能会有所帮助:
以下是初始的相关doc:link
这是对新函数格式link中命名方案的解释
这是一些更高级的场景的更详细的解释:link
热释光;博士
spring不再使用基于注解的配置,而是使用检测到的 Consumer / Function / Supplier 为您定义流。
输入/消费者
而在你有这样的代码之前:

  1. interface BindableGradesChannel {
  2. @Input
  3. fun gradesChannel(): SubscribableChannel
  4. companion object {
  5. const val INPUT = "gradesChannel"
  6. }
  7. }

用法类似于:

  1. @Service
  2. @EnableBinding(BindableGradesChannel::class)
  3. class GradesListener {
  4. private val log = LoggerFactory.getLogger(GradesListener::class.java)
  5. @StreamListener(BindableScoresChannel.INPUT)
  6. fun listen(grade: Grade) {
  7. log.info("Received $grade")
  8. // do something
  9. }
  10. }

现在整个定义都无关紧要,可以这样做:

  1. @Service
  2. class GradesListener {
  3. private val log = LoggerFactory.getLogger(GradesListener::class.java)
  4. @Bean
  5. fun gradesChannel(): Consumer<Grade> {
  6. return Consumer { listen(grade = it) }
  7. }
  8. fun listen(grade: Grade) {
  9. log.info("Received $grade")
  10. // do something
  11. }
  12. }

注意 Consumer 比恩取代了 @StreamListener 以及 @Input .
关于配置,如果之前为了配置你有一个 application.yml 看起来是这样的:

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. gradesChannel:
  6. destination: GradesExchange
  7. group: grades-updates
  8. consumer:
  9. concurrency: 10
  10. max-attempts: 3

现在应该是这样:

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. gradesChannel-in-0:
  6. destination: GradesExchange
  7. group: grades-updates
  8. consumer:
  9. concurrency: 10
  10. max-attempts: 3

注意怎么做 gradesChannel 被替换为 gradesChannel-in-0 -要了解完整的命名约定,请参阅顶部的命名约定链接。
一些细节:
如果应用程序中有多个这样的bean,则需要定义 spring.cloud.function.definition 财产。
您可以选择为频道指定自定义名称,因此如果您想继续使用 gradesChannel 你可以设置 spring.cloud.stream.function.bindings.gradesChannel-in-0=gradesChannel 并在配置中随处使用 gradesChannel .
输出/供应商
这里的概念是相似的,您替换config,代码如下所示:

  1. interface BindableStudentsChannel {
  2. @Output
  3. fun studentsChannel(): MessageChannel
  4. }

  1. @Service
  2. @EnableBinding(BindableStudentsChannel::class)
  3. class StudentsQueueWriter(private val studentsChannel: BindableStudentsChannel) {
  4. fun publish(message: Message<Student>) {
  5. studentsChannel.studentsChannel().send(message)
  6. }
  7. }

现在可以替换为:

  1. @Service
  2. class StudentsQueueWriter {
  3. @Bean
  4. fun studentsChannel(): Supplier<Student> {
  5. return Supplier { Student("Adam") }
  6. }
  7. }

正如你所看到的,我们有一个很大的区别-什么时候叫,谁叫?
以前我们可以手动触发它,但现在它是由spring每秒(默认情况下)触发的。这对于需要每秒发布一个传感器数据的用例来说是很好的,但是当您想要发送一个事件的消息时就不好了。除了使用 Function 无论出于何种原因,spring提供了两种选择:
streambridge-链接
使用 StreamBridge 你可以。明确定义目标,如下所示:

  1. @Service
  2. class StudentsQueueWriter(private val streamBridge: StreamBridge) {
  3. fun publish(message: Message<Student>) {
  4. streamBridge.send("studentsChannel-out-0", message)
  5. }
  6. }

这样,您就不会将目标通道定义为bean,但仍然可以发送消息。缺点是类中有一些显式配置。
React堆api-链接
另一种方法是使用某种React机制,例如 EmitterProcessor ,并归还。使用此选项,您的代码将类似于:

  1. @Service
  2. class StudentsQueueWriter {
  3. val students: EmitterProcessor<Student> = EmitterProcessor.create()
  4. @Bean
  5. fun studentsChannel(): Supplier<Flux<Student>> {
  6. return Supplier { students }
  7. }
  8. }

用法可能类似于:

  1. class MyClass(val studentsQueueWriter: StudentsQueueWriter) {
  2. fun newStudent() {
  3. studentsQueueWriter.students.onNext(Student("Adam"))
  4. }
  5. }
展开查看全部

相关问题