由于版本3.1,不推荐使用用于处理队列的主要api。在课堂评论中说:从3.1开始就不推荐使用函数式编程模型我在网上搜索了很多解决方案,但没有找到一个关于如何迁移的可靠的e2e解释。寻找以下示例:从队列读取写入队列如果有一些方法可以做到这一点(正如我在web上看到的),我很乐意为每个选项提供一个解释和典型的用例。
nhhxz33t1#
我假设您已经熟悉了主要概念,并将重点介绍迁移。我使用kotlin作为演示代码,以减少冗长首先,一些参考资料可能会有所帮助:以下是初始的相关doc:link这是对新函数格式link中命名方案的解释这是一些更高级的场景的更详细的解释:link热释光;博士spring不再使用基于注解的配置,而是使用检测到的 Consumer / Function / Supplier 为您定义流。输入/消费者而在你有这样的代码之前:
Consumer
Function
Supplier
interface BindableGradesChannel { @Input fun gradesChannel(): SubscribableChannel companion object { const val INPUT = "gradesChannel" }}
interface BindableGradesChannel {
@Input
fun gradesChannel(): SubscribableChannel
companion object {
const val INPUT = "gradesChannel"
}
用法类似于:
@Service@EnableBinding(BindableGradesChannel::class)class GradesListener { private val log = LoggerFactory.getLogger(GradesListener::class.java) @StreamListener(BindableScoresChannel.INPUT) fun listen(grade: Grade) { log.info("Received $grade") // do something }}
@Service
@EnableBinding(BindableGradesChannel::class)
class GradesListener {
private val log = LoggerFactory.getLogger(GradesListener::class.java)
@StreamListener(BindableScoresChannel.INPUT)
fun listen(grade: Grade) {
log.info("Received $grade")
// do something
现在整个定义都无关紧要,可以这样做:
@Serviceclass GradesListener { private val log = LoggerFactory.getLogger(GradesListener::class.java) @Bean fun gradesChannel(): Consumer<Grade> { return Consumer { listen(grade = it) } } fun listen(grade: Grade) { log.info("Received $grade") // do something }}
@Bean
fun gradesChannel(): Consumer<Grade> {
return Consumer { listen(grade = it) }
注意 Consumer 比恩取代了 @StreamListener 以及 @Input .关于配置,如果之前为了配置你有一个 application.yml 看起来是这样的:
@StreamListener
application.yml
spring: cloud: stream: bindings: gradesChannel: destination: GradesExchange group: grades-updates consumer: concurrency: 10 max-attempts: 3
spring:
cloud:
stream:
bindings:
gradesChannel:
destination: GradesExchange
group: grades-updates
consumer:
concurrency: 10
max-attempts: 3
现在应该是这样:
spring: cloud: stream: bindings: gradesChannel-in-0: destination: GradesExchange group: grades-updates consumer: concurrency: 10 max-attempts: 3
gradesChannel-in-0:
注意怎么做 gradesChannel 被替换为 gradesChannel-in-0 -要了解完整的命名约定,请参阅顶部的命名约定链接。一些细节:如果应用程序中有多个这样的bean,则需要定义 spring.cloud.function.definition 财产。您可以选择为频道指定自定义名称,因此如果您想继续使用 gradesChannel 你可以设置 spring.cloud.stream.function.bindings.gradesChannel-in-0=gradesChannel 并在配置中随处使用 gradesChannel .输出/供应商这里的概念是相似的,您替换config,代码如下所示:
gradesChannel
gradesChannel-in-0
spring.cloud.function.definition
spring.cloud.stream.function.bindings.gradesChannel-in-0=gradesChannel
interface BindableStudentsChannel { @Output fun studentsChannel(): MessageChannel}
interface BindableStudentsChannel {
@Output
fun studentsChannel(): MessageChannel
和
@Service@EnableBinding(BindableStudentsChannel::class)class StudentsQueueWriter(private val studentsChannel: BindableStudentsChannel) { fun publish(message: Message<Student>) { studentsChannel.studentsChannel().send(message) }}
@EnableBinding(BindableStudentsChannel::class)
class StudentsQueueWriter(private val studentsChannel: BindableStudentsChannel) {
fun publish(message: Message<Student>) {
studentsChannel.studentsChannel().send(message)
现在可以替换为:
@Serviceclass StudentsQueueWriter { @Bean fun studentsChannel(): Supplier<Student> { return Supplier { Student("Adam") } }}
class StudentsQueueWriter {
fun studentsChannel(): Supplier<Student> {
return Supplier { Student("Adam") }
正如你所看到的,我们有一个很大的区别-什么时候叫,谁叫?以前我们可以手动触发它,但现在它是由spring每秒(默认情况下)触发的。这对于需要每秒发布一个传感器数据的用例来说是很好的,但是当您想要发送一个事件的消息时就不好了。除了使用 Function 无论出于何种原因,spring提供了两种选择:streambridge-链接使用 StreamBridge 你可以。明确定义目标,如下所示:
StreamBridge
@Serviceclass StudentsQueueWriter(private val streamBridge: StreamBridge) { fun publish(message: Message<Student>) { streamBridge.send("studentsChannel-out-0", message) }}
class StudentsQueueWriter(private val streamBridge: StreamBridge) {
streamBridge.send("studentsChannel-out-0", message)
这样,您就不会将目标通道定义为bean,但仍然可以发送消息。缺点是类中有一些显式配置。React堆api-链接另一种方法是使用某种React机制,例如 EmitterProcessor ,并归还。使用此选项,您的代码将类似于:
EmitterProcessor
@Serviceclass StudentsQueueWriter { val students: EmitterProcessor<Student> = EmitterProcessor.create() @Bean fun studentsChannel(): Supplier<Flux<Student>> { return Supplier { students } }}
val students: EmitterProcessor<Student> = EmitterProcessor.create()
fun studentsChannel(): Supplier<Flux<Student>> {
return Supplier { students }
用法可能类似于:
class MyClass(val studentsQueueWriter: StudentsQueueWriter) { fun newStudent() { studentsQueueWriter.students.onNext(Student("Adam")) }}
class MyClass(val studentsQueueWriter: StudentsQueueWriter) {
fun newStudent() {
studentsQueueWriter.students.onNext(Student("Adam"))
1条答案
按热度按时间nhhxz33t1#
我假设您已经熟悉了主要概念,并将重点介绍迁移。
我使用kotlin作为演示代码,以减少冗长
首先,一些参考资料可能会有所帮助:
以下是初始的相关doc:link
这是对新函数格式link中命名方案的解释
这是一些更高级的场景的更详细的解释:link
热释光;博士
spring不再使用基于注解的配置,而是使用检测到的
Consumer
/Function
/Supplier
为您定义流。输入/消费者
而在你有这样的代码之前:
用法类似于:
现在整个定义都无关紧要,可以这样做:
注意
Consumer
比恩取代了@StreamListener
以及@Input
.关于配置,如果之前为了配置你有一个
application.yml
看起来是这样的:现在应该是这样:
注意怎么做
gradesChannel
被替换为gradesChannel-in-0
-要了解完整的命名约定,请参阅顶部的命名约定链接。一些细节:
如果应用程序中有多个这样的bean,则需要定义
spring.cloud.function.definition
财产。您可以选择为频道指定自定义名称,因此如果您想继续使用
gradesChannel
你可以设置spring.cloud.stream.function.bindings.gradesChannel-in-0=gradesChannel
并在配置中随处使用gradesChannel
.输出/供应商
这里的概念是相似的,您替换config,代码如下所示:
和
现在可以替换为:
正如你所看到的,我们有一个很大的区别-什么时候叫,谁叫?
以前我们可以手动触发它,但现在它是由spring每秒(默认情况下)触发的。这对于需要每秒发布一个传感器数据的用例来说是很好的,但是当您想要发送一个事件的消息时就不好了。除了使用
Function
无论出于何种原因,spring提供了两种选择:streambridge-链接
使用
StreamBridge
你可以。明确定义目标,如下所示:这样,您就不会将目标通道定义为bean,但仍然可以发送消息。缺点是类中有一些显式配置。
React堆api-链接
另一种方法是使用某种React机制,例如
EmitterProcessor
,并归还。使用此选项,您的代码将类似于:用法可能类似于: