我尝试使用Sping Boot + Apache Camel + Kafka来订阅某个主题,然后启动一些dockers容器并等待它,但有一次我添加了一行:to(docker:contianerstart?host...)
,则会得到下一个异常:
java.lang.IllegalArgumentException: Container ID must be specified must be specified
at org.apache.camel.util.ObjectHelper.notNull(ObjectHelper.java:151) ~[camel-util-3.7.0.jar:3.7.0]
at org.apache.camel.component.docker.producer.DockerProducer.executeStartContainerRequest(DockerProducer.java:1045) ~[camel-docker-3.7.0.jar:3.7.0]
at org.apache.camel.component.docker.producer.DockerProducer.process(DockerProducer.java:163) ~[camel-docker-3.7.0.jar:3.7.0]
at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66) ~[camel-support-3.7.0.jar:3.7.0]
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:169) ~[camel-core-processor-3.7.0.jar:3.7.0]
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:395) ~[camel-core-processor-3.7.0.jar:3.7.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148) ~[camel-base-engine-3.7.0.jar:3.7.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60) ~[camel-base-engine-3.7.0.jar:3.7.0]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:147) ~[camel-core-processor-3.7.0.jar:3.7.0]
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:312) ~[camel-base-engine-3.7.0.jar:3.7.0]
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) ~[camel-base-engine-3.7.0.jar:3.7.0]
at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41) ~[camel-support-3.7.0.jar:3.7.0]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:357) ~[camel-kafka-3.7.0.jar:3.7.0]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:222) ~[camel-kafka-3.7.0.jar:3.7.0]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
2022-10-24 17:02:24.101 WARN 251133 --- [new-crew-files]] o.a.camel.component.kafka.KafkaConsumer : Error during processing. Exchange[F02B0892689E52A-0000000000000000]. Caused by: [java.lang.IllegalArgumentException - Container ID must be specified must be specified]
java.lang.IllegalArgumentException: Container ID must be specified must be specified
at org.apache.camel.util.ObjectHelper.notNull(ObjectHelper.java:151) ~[camel-util-3.7.0.jar:3.7.0]
at org.apache.camel.component.docker.producer.DockerProducer.executeStartContainerRequest(DockerProducer.java:1045) ~[camel-docker-3.7.0.jar:3.7.0]
at org.apache.camel.component.docker.producer.DockerProducer.process(DockerProducer.java:163) ~[camel-docker-3.7.0.jar:3.7.0]
at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66) ~[camel-support-3.7.0.jar:3.7.0]
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:169) ~[camel-core-processor-3.7.0.jar:3.7.0]
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:395) ~[camel-core-processor-3.7.0.jar:3.7.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148) ~[camel-base-engine-3.7.0.jar:3.7.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60) ~[camel-base-engine-3.7.0.jar:3.7.0]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:147) ~[camel-core-processor-3.7.0.jar:3.7.0]
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:312) ~[camel-base-engine-3.7.0.jar:3.7.0]
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) ~[camel-base-engine-3.7.0.jar:3.7.0]
at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41) ~[camel-support-3.7.0.jar:3.7.0]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:357) ~[camel-kafka-3.7.0.jar:3.7.0]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:222) ~[camel-kafka-3.7.0.jar:3.7.0]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
下面是代码。如何将容器ID传递到这里?
from("kafka:new-files?brokers=localhost:9092")
.setProperty("proc_vers", simple(procVers()))
.setHeader(DockerConstants.DOCKER_REPOSITORY, simple("parser"))
.setHeader(DockerConstants.DOCKER_TAG, simple("latest"))
.setHeader(DockerConstants.DOCKER_IMAGE_ID, simple("efded01f5f75"))
.setHeader(DockerConstants.DOCKER_IMAGE, simple("parser:latest"))
.setHeader(DockerConstants.DOCKER_NAME, exchangeProperty("proc_vers"))
.setHeader(DockerConstants.DOCKER_ENV, () -> new String []{
"MODE=xxx", "PARSER_CLIENT=xxx",
"PATHS=/home/xxx/xxx" })
.setHeader(DockerConstants.DOCKER_BINDS, simple("/home/xxx:/xxx"))
.to("docker:containercreate?host=/var/run/docker.sock")
.to("docker:containerstart?host=/var/run/docker.sock?parameters=")
.log("container started")
.log("container end");
1条答案
按热度按时间eqqqjvef1#
由于
containercreate
操作返回com.github.dockerjava.api.command.CreateContainerResponse
类型的响应,因此可以从其getter中检索id,因此只需在调用containerstart
操作之前设置与容器id相对应的Camel标头,如下所示: