java—在ApacheCamel中创建发送到dynamic.to()的动态路由

rhfm7lfc  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(298)

我想创建一个应用程序,路由将决定在运行时间的两端,从()和到()和处理之间保持不变
我的示例:我希望在我的应用程序中有一个配置文件,在那里我将告诉源和目标,而中间作业保持不变。
在我的所有解决方案中,我将始终以进程()的形式执行作业
棘手的部分是让两端都充满活力。。。
我读过关于收件人列表,tod(),choice()。when()
我似乎不能使这项工作。
我的目标是拥有某种routefactory,它将相应地设置camelcontext(假设我的选项是kafka或rabbitmq)
一旦我得到了kafka/rabbitmq路由,现在我的processor()将在消息中设置一个头,告诉我我的to()是否需要转到kafka/rabbitmq
我只解决了这个问题的一端,要么我的from()是动态的,要么我的to()是动态的。。。
但我似乎不能让它们都充满活力
我创建了一个这样的工厂:

public RouteBuilder RouteFactory(String type, Processor processor)
{
    switch (type)
    {
        case "kafka":
            return new RouteBuilder() {
                @Override
                public void configure() throws Exception {
                    // setup kafka component with the brokers
                    KafkaComponent kafka = new KafkaComponent();
                    kafka.setBrokers("{{kafka.host}}:{{kafka.port}}");
                    getContext().addComponent("kafka", kafka);
                    log.info("About to start route: Kafka Server -> Log ");

                    from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
                            + "&maxPollRecords={{consumer.maxPollRecords}}"
                            + "&consumersCount={{consumer.consumersCount}}"
                            + "&seekTo={{consumer.seekTo}}"
                            + "&groupId={{consumer.group}}"
                            + "&valueDeserializer=" + BytesDeserializer.class.getName())
                            .routeId("FromKafka")
                            .process(processor)
                            .choice()
                                .when(header("kafka"))
                                    .recipientList(simple("kafka:${header.kafka[topicName]}"))
                                .end()
                                .when(header("rabbit"))
                                    .recipientList(simple("rabbit:${header.rabbit[queueName]}"))
                                .end();
                }
            };
        case "rabbit":
            return new RouteBuilder() {
                @Override
                public void configure() throws Exception {
                    // set rabbit component

                    from("rabbit")
                            .routeId("rabbit")
                            .process(processor)
                            .choice()
                                .when(header("kafka"))
                                    .recipientList(simple("kafka:${header.kafka[topicName]}"))
                                .end()
                                .when(header("rabbit"))
                                    .recipientList(simple("rabbit:${header.rabbit[queueName]}"))
                                .end();
                }
            };
    }

    return null;
}

正如你所看到的,两条路线都有相同的结尾,我添加的消息越多,我需要做的复制粘贴就越多。。。。
我似乎找不到一个方法使to()动态(可能是另一个工厂?但是to()方法不接受路由)
如果我可以在我所有的工厂路径中使用相同的generic to(),然后在一个地方管理我所有的to()选项,那将是我尝试使用的最佳选择direct:outputmanager但是“outputmanager”必须从一个from()开始,我只需要解析一个to()
p、 s-我是 Camel 队的新手,所以我承认我可能已经完全离开了,还有一个更简单的解决方案,我很高兴听到它p.s 2-我只检查了Kafka路线和兔子路线,我只是做了编码,以适应这个问题,所以我知道它不会在兔子的一部分工作
编辑:正如paizo在我探索端点方向的评论中所建议的那样,看起来我唯一错过的事情就是能够访问端点中的消息,以便决定返回哪个生产者
代码:

.to(new DefaultEndpoint() {
                @Override
                public Producer createProducer() throws Exception {

                    // get message header and decide which producer to return
                    Producer producer;
                    //case kafka
                    producer = new KafkaProducer(new KafkaEndpoint("kafka:blaTopic",new KafkaComponent(getContext())));
                    //case rabbit - psodo code
                    producer = new RabbitProducer();

                    return producer;
                }

                @Override
                public Consumer createConsumer(Processor processor) throws Exception {
                    throw new UnsupportedOperationException("You cannot receive messages from this endpoint");
                }

                @Override
                public boolean isSingleton() {
                    return false;
                }
            })

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题