这几天我在学习scdf…我有一些关于指定目的地的问题。我创建了一个类似“:testtopic>log”的流。我可以看到日志接收器使用主题“test topic”中的数据。但是如果我向scdf添加一个自定义处理器,那么我会创建一个如下流:
:测试主题源>etl数据转换>:测试主题接收器
我认为“etl data transform”处理器将从主题“test topic source”(kafka)和产品数据消耗到“test topic sink”,但是日志是“subscribed to topic(s):stringoperation-in-0”和“using kafka topic for outbound:stringoperation-out-0”(“stringoperation”是我的自定义函数)
为什么scdf不使用主题“测试主题源”和“测试主题接收器”?如何解决?
我知道我可以使用这样的属性:spring.cloud.stream.function.bindings.stringoperation-in-0=in-spring.cloud.stream.bindings.in.destination=test-topic-source但是如果我想输出到两个主题?
谢谢!
2条答案
按热度按时间r6vfmomb1#
看起来您的自定义处理器应用程序使用
inbound
以及outbound
要输入的名称in
以及out
. scdf希望这些名称input
以及output
分别。这就是您看到基于这些名称的显式绑定名称的原因in
以及out
. 请把名字改成input
以及output
我相信这会解决这个问题。nfs0ujit2#
谢谢!那是我的错。我将'@enablebinding(processor.class)'添加到我的自定义处理器,然后修复它。