spring集成dsl语法问题-如何动态构造子流?

thtygnil  于 2021-07-06  发布在  Java
关注(0)|答案(1)|浏览(409)

我试图在spring集成中构建一个复杂的流,其中子流是在运行时动态定义的。在主流定义中运行良好的代码在子流定义中的编译失败。由于构造看起来是相同的,所以发生了什么并不明显。如有任何解释,将不胜感激。
先谢谢你。
主流定义的编码如下:

StandardIntegrationFlow flow = IntegrationFlows
            .from(setupAdapter,
                    c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))

// This one compiles fine
            .enrichHeaders(h -> h.headerExpression("start", "start\")")
                    .headerExpression("end", "payload[0].get(\"end\")"))

            .split(tableSplitter)
            .enrichHeaders(h -> h.headerExpression("object", "payload[0].get(\"object\")"))
            .channel(c -> c.executor(stepTaskExecutor))
            .routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules))
            .aggregate()
            .handle(cleanupAdapter).get();

BuildRecipientListerSpecForRules定义为:

private RecipientListRouterSpec buildRecipientListRouterSpecForRules(RecipientListRouterSpec recipientListSpec,
        Collection<RuleMetadata> rules) {
    rules.forEach(
            rule -> recipientListSpec.recipientFlow(getFilterExpression(rule), f -> createFlowDefForRule(f, rule)));

    return recipientListSpec;
}

createflowdefforrule()只是一个 switch() Package 器,用于为规则定义的流选择要运行的实际dsl。这是一个样本

public IntegrationFlowDefinition constructASpecificFlowDef(IntegrationFlowDefinition flowDef, RuleMetadata rule) {

    return flowDef
       // This enrichHeaders element fails to compile,
       // The method headerExpression(String, String) is undefined for the type Object
            .enrichHeaders(h -> h.headerExpression("ALC_operation", "payload[0].get(\"ALC_operation\")"));

 }
r9f1avp5

r9f1avp51#

一般来说,最好将这些解释放在问题文本中,而不是作为注解放在代码片段中;我完全没有听到那句话。
你能提供一个精简的(更简单的)例子(完整的类)来展示这个行为吗?
我试着简化你正在做的事情,结果编译得很好,工作正常:

@SpringBootApplication
public class So65010958Application {

    public static void main(String[] args) {
        SpringApplication.run(So65010958Application.class, args);
    }

    @Bean
    IntegrationFlow flow() {
        return IntegrationFlows.from("foo")
                .routeToRecipients(r -> r.recipientFlow("true", f -> buildFlow(f)))
                .get();
    }

    private IntegrationFlowDefinition<?> buildFlow(IntegrationFlowDefinition<?> f) {
        return f.enrichHeaders(h -> h.headerExpression("foo", "'bar'"))
                .channel(MessageChannels.queue("bar"));
    }

    @Bean
    public ApplicationRunner runner(MessageChannel foo, PollableChannel bar) {
        return args -> {
            foo.send(new GenericMessage<>("foo"));
            System.out.println(bar.receive(0));
        };
    }

}

genericmessage[payload=foo,headers={foo=bar,id=d526b8fb-c6f8-7731-b1ad-e68e326fcc00,timestamp=1606333567749}]
所以,我肯定错过了什么。

相关问题