apache-kafka 如何为apache camel的containerstart操作指定containers id?

zzwlnbp8  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(151)

我尝试使用Sping Boot + Apache Camel + Kafka来订阅某个主题,然后启动一些dockers容器并等待它,但有一次我添加了一行:to(docker:contianerstart?host...),则会得到下一个异常:

java.lang.IllegalArgumentException: Container ID must be specified must be specified
    at org.apache.camel.util.ObjectHelper.notNull(ObjectHelper.java:151) ~[camel-util-3.7.0.jar:3.7.0]
    at org.apache.camel.component.docker.producer.DockerProducer.executeStartContainerRequest(DockerProducer.java:1045) ~[camel-docker-3.7.0.jar:3.7.0]
    at org.apache.camel.component.docker.producer.DockerProducer.process(DockerProducer.java:163) ~[camel-docker-3.7.0.jar:3.7.0]
    at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66) ~[camel-support-3.7.0.jar:3.7.0]
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:169) ~[camel-core-processor-3.7.0.jar:3.7.0]
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:395) ~[camel-core-processor-3.7.0.jar:3.7.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148) ~[camel-base-engine-3.7.0.jar:3.7.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60) ~[camel-base-engine-3.7.0.jar:3.7.0]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:147) ~[camel-core-processor-3.7.0.jar:3.7.0]
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:312) ~[camel-base-engine-3.7.0.jar:3.7.0]
    at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) ~[camel-base-engine-3.7.0.jar:3.7.0]
    at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41) ~[camel-support-3.7.0.jar:3.7.0]
    at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:357) ~[camel-kafka-3.7.0.jar:3.7.0]
    at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:222) ~[camel-kafka-3.7.0.jar:3.7.0]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]

2022-10-24 17:02:24.101  WARN 251133 --- [new-crew-files]] o.a.camel.component.kafka.KafkaConsumer  : Error during processing. Exchange[F02B0892689E52A-0000000000000000]. Caused by: [java.lang.IllegalArgumentException - Container ID must be specified must be specified]

java.lang.IllegalArgumentException: Container ID must be specified must be specified
    at org.apache.camel.util.ObjectHelper.notNull(ObjectHelper.java:151) ~[camel-util-3.7.0.jar:3.7.0]
    at org.apache.camel.component.docker.producer.DockerProducer.executeStartContainerRequest(DockerProducer.java:1045) ~[camel-docker-3.7.0.jar:3.7.0]
    at org.apache.camel.component.docker.producer.DockerProducer.process(DockerProducer.java:163) ~[camel-docker-3.7.0.jar:3.7.0]
    at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66) ~[camel-support-3.7.0.jar:3.7.0]
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:169) ~[camel-core-processor-3.7.0.jar:3.7.0]
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:395) ~[camel-core-processor-3.7.0.jar:3.7.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148) ~[camel-base-engine-3.7.0.jar:3.7.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60) ~[camel-base-engine-3.7.0.jar:3.7.0]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:147) ~[camel-core-processor-3.7.0.jar:3.7.0]
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:312) ~[camel-base-engine-3.7.0.jar:3.7.0]
    at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) ~[camel-base-engine-3.7.0.jar:3.7.0]
    at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41) ~[camel-support-3.7.0.jar:3.7.0]
    at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:357) ~[camel-kafka-3.7.0.jar:3.7.0]
    at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:222) ~[camel-kafka-3.7.0.jar:3.7.0]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]

下面是代码。如何将容器ID传递到这里?

from("kafka:new-files?brokers=localhost:9092")
    .setProperty("proc_vers", simple(procVers()))
    .setHeader(DockerConstants.DOCKER_REPOSITORY, simple("parser"))
    .setHeader(DockerConstants.DOCKER_TAG, simple("latest")) 
    .setHeader(DockerConstants.DOCKER_IMAGE_ID, simple("efded01f5f75"))
    .setHeader(DockerConstants.DOCKER_IMAGE, simple("parser:latest"))
    .setHeader(DockerConstants.DOCKER_NAME, exchangeProperty("proc_vers"))
    .setHeader(DockerConstants.DOCKER_ENV, () -> new String []{
                        "MODE=xxx", "PARSER_CLIENT=xxx",
                        "PATHS=/home/xxx/xxx"  })
    .setHeader(DockerConstants.DOCKER_BINDS, simple("/home/xxx:/xxx"))
    .to("docker:containercreate?host=/var/run/docker.sock")
    .to("docker:containerstart?host=/var/run/docker.sock?parameters=")
    .log("container started")
    .log("container end");
eqqqjvef

eqqqjvef1#

由于containercreate操作返回com.github.dockerjava.api.command.CreateContainerResponse类型的响应,因此可以从其getter中检索id,因此只需在调用containerstart操作之前设置与容器id相对应的Camel标头,如下所示:

...
    .to("docker:containercreate")
    // Calls getId on the body of the message that is of type 
    // CreateContainerResponse and uses the result as container id
    .setHeader(DockerConstants.DOCKER_CONTAINER_ID, simple("${body.id}"))
    .to("docker:containerstart")
    ...

相关问题