使用materializer运行runnablegraph对于springboot服务中的正常操作来说工作正常,但在单元测试中由于以下错误而失败:
Caused by: java.lang.ClassCastException: scala.concurrent.java8.FuturesConvertersImpl$CF cannot be cast to scala.concurrent.Future
at akka.stream.javadsl.GraphCreate.$anonfun$create$4(GraphCreate.scala:34)
at scala.Function2.$anonfun$curried$2(Function2.scala:44)
at akka.stream.scaladsl.GraphApply.$anonfun$create$2(GraphApply.scala:45)
at akka.stream.impl.Compose.apply(TraversalBuilder.scala:171)
at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:530)
at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:450)
at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:442)
at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:703)
at akka.stream.javadsl.RunnableGraph$RunnableGraphAdapter.run(Flow.scala:3846)
我相信这是由akka流kafka testkit引起的依赖性问题。但是我一直找不到问题依赖。
jdk:oracle 1.8
spring boot版本:2.2.6.0
akka(和akka streams)版本:2.6.5
akka streamKafka版本:2.0.2
创建图形的代码(graph.run()函数失败):
RunnableGraph<Tuple2<Future<Done>, CompletionStage<Done>>> tuple2RunnableGraph = RunnableGraph.fromGraph(graph);
tuple2RunnableGraph.run(materializer);
...
private Graph<ClosedShape, Tuple2<Future<Done>, CompletionStage<Done>>> getSinkFanoutGraph(Source<OperationContext, NotUsed> restartableSource, Sink<ConsumerMessage.CommittableOffset, CompletionStage<Done>> committerSink, Flow<OperationContext, ConsumerMessage.CommittableOffset, NotUsed> committerAdapter, Flow<OperationContext, ProducerRecord<String, byte[]>, NotUsed> senderFiltered) {
return GraphDSL.create(sink, committerSink, Tuple2::new, (builder, s1, s2) -> {
UniformFanOutShape<OperationContext, OperationContext> broadcast = builder.add(Broadcast.create(2));
FlowShape<OperationContext, ConsumerMessage.CommittableOffset> getOffset = builder.add(committerAdapter);
FlowShape<OperationContext, ProducerRecord<String, byte[]>> sendFiltered = builder.add(senderFiltered);
builder.from(builder.add(restartableSource)).viaFanOut(broadcast);
builder.from(broadcast.out(0)).via(sendFiltered).to(s1);
builder.from(broadcast.out(1)).via(getOffset).to(s2);
return ClosedShape.getInstance();
});
}
pom文件的相关部分:
<akka.version>2.6.5</akka.version>
<akka.stream.kafka.version>2.0.2</akka.stream.kafka.version>
...
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-kafka_2.13</artifactId>
<version>${akka.stream.kafka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_2.13</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-testkit_2.13</artifactId>
<version>${akka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-kafka-testkit_2.13</artifactId>
<version>${akka.stream.kafka.version}</version>
<scope>test</scope>
</dependency>
暂无答案!
目前还没有任何答案,快来回答吧!