akka流测试:futuresconvertersimpl$cf classcastexception in phasedfusingactormaterializer.materialize

vof42yt1  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(198)

使用materializer运行runnablegraph对于springboot服务中的正常操作来说工作正常,但在单元测试中由于以下错误而失败:

Caused by: java.lang.ClassCastException: scala.concurrent.java8.FuturesConvertersImpl$CF cannot be cast to scala.concurrent.Future
    at akka.stream.javadsl.GraphCreate.$anonfun$create$4(GraphCreate.scala:34)
    at scala.Function2.$anonfun$curried$2(Function2.scala:44)
    at akka.stream.scaladsl.GraphApply.$anonfun$create$2(GraphApply.scala:45)
    at akka.stream.impl.Compose.apply(TraversalBuilder.scala:171)
    at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:530)
    at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:450)
    at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:442)
    at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:703)
    at akka.stream.javadsl.RunnableGraph$RunnableGraphAdapter.run(Flow.scala:3846)

我相信这是由akka流kafka testkit引起的依赖性问题。但是我一直找不到问题依赖。
jdk:oracle 1.8
spring boot版本:2.2.6.0
akka(和akka streams)版本:2.6.5
akka streamKafka版本:2.0.2
创建图形的代码(graph.run()函数失败):

RunnableGraph<Tuple2<Future<Done>, CompletionStage<Done>>> tuple2RunnableGraph = RunnableGraph.fromGraph(graph);
        tuple2RunnableGraph.run(materializer);
...

private Graph<ClosedShape, Tuple2<Future<Done>, CompletionStage<Done>>> getSinkFanoutGraph(Source<OperationContext, NotUsed> restartableSource, Sink<ConsumerMessage.CommittableOffset, CompletionStage<Done>> committerSink, Flow<OperationContext, ConsumerMessage.CommittableOffset, NotUsed> committerAdapter, Flow<OperationContext, ProducerRecord<String, byte[]>, NotUsed> senderFiltered) {
        return GraphDSL.create(sink, committerSink, Tuple2::new, (builder, s1, s2) -> {
            UniformFanOutShape<OperationContext, OperationContext> broadcast = builder.add(Broadcast.create(2));

            FlowShape<OperationContext, ConsumerMessage.CommittableOffset> getOffset = builder.add(committerAdapter);
            FlowShape<OperationContext, ProducerRecord<String, byte[]>> sendFiltered = builder.add(senderFiltered);

            builder.from(builder.add(restartableSource)).viaFanOut(broadcast);
            builder.from(broadcast.out(0)).via(sendFiltered).to(s1);
            builder.from(broadcast.out(1)).via(getOffset).to(s2);
            return ClosedShape.getInstance();
        });
    }

pom文件的相关部分:

<akka.version>2.6.5</akka.version>
        <akka.stream.kafka.version>2.0.2</akka.stream.kafka.version>
...
    <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-stream-kafka_2.13</artifactId>
            <version>${akka.stream.kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-stream_2.13</artifactId>
            <version>${akka.version}</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-stream-testkit_2.13</artifactId>
            <version>${akka.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-stream-kafka-testkit_2.13</artifactId>
            <version>${akka.stream.kafka.version}</version>
            <scope>test</scope>
        </dependency>

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题