sqsacksink使akka流永远挂起,导致图形重新启动

piztneat  于 2021-06-27  发布在  Java
关注(0)|答案(0)|浏览(375)

我尝试用一个sqs源代码和一个localstack测试来实现一个简单的工作流。如果我再加上 SqsAckSink ,也不适用于 SqsAckFlow . 但如果我移除 SqsAckSink 就用它吧 Sink.seq() ,则测试通过。拥有 SqsAckSink 或者 SqsAckFlow 让考试永远悬着。我也在测试中启用了debug,看到同样的错误一次又一次地重复,使得图形重新启动,但这对我来说没有多大意义。我将在下面的代码片段之后发布错误消息。
代码是:

  1. public class DefaultWorkflow implements Workflow {
  2. private Function<Throwable, Supervision.Directive> errorStrategy;
  3. private final ActorSystem actorSystem;
  4. private FlowMonitor<ProvisioningResult> workflowMonitor;
  5. private final String queueUrl;
  6. private final SqsAsyncClient asyncClient;
  7. @Inject
  8. public DefaultWorkflow(ActorSystem actorSystem, String queueUrl) {
  9. this.errorStrategy = exc -> (Supervision.Directive) Supervision.resume();
  10. this.actorSystem = actorSystem;
  11. this.queueUrl = queueUrl;
  12. asyncClient =
  13. SqsAsyncClient.builder()
  14. .region(Region.of(Localstack.getDefaultRegion()))
  15. .httpClient(AkkaHttpClient.builder().withActorSystem(actorSystem).build())
  16. .endpointOverride(new URI(queueUrl))
  17. .build();
  18. doWork();
  19. }
  20. private Flow<Message, ProvisioningResult, NotUsed> buildFlow() {
  21. return Flow.of(Message.class)
  22. .via(Flow.of(Message.class).map(m -> ProvisioningResult.builder().body(m.body()).build()));
  23. }
  24. @Override
  25. public Source<ProvisioningResult, FlowMonitor<ProvisioningResult>> getSource() {
  26. Source<Message, NotUsed> sqsSource =
  27. RestartSource.onFailuresWithBackoff(
  28. Duration.ofSeconds(1), Duration.ofSeconds(2), 0.1, this::createSQSSource);
  29. return sqsSource
  30. .via(buildFlow())
  31. .withAttributes(ActorAttributes.withSupervisionStrategy(errorStrategy))
  32. .monitorMat(Keep.right());
  33. }
  34. private Source<Message, NotUsed> createSQSSource() {
  35. SqsSourceSettings sqsSourceSettings = SqsSourceSettings.create().withMaxBatchSize(1);
  36. return SqsSource.create(queueUrl, sqsSourceSettings, asyncClient);
  37. }
  38. @Override
  39. public FlowMonitor<ProvisioningResult> getWorkflowMonitor() {
  40. return this.workflowMonitor;
  41. }
  42. private void doWork() {
  43. Pair<FlowMonitor<ProvisioningResult>, CompletionStage<Done>> run =
  44. getSource()
  45. .toMat(SqsAckSink.create(queueUrl, SqsAckSettings.create(), asyncClient), Keep.both())
  46. .run(actorSystem);
  47. workflowMonitor = run.first();
  48. }
  49. }

测试如下所示:

  1. @Test
  2. public void getSource_givenMessage_shouldProduceResult()
  3. throws InterruptedException, ExecutionException, TimeoutException, URISyntaxException {
  4. String sqsName = "sqs2";
  5. String messageBody = "someMessage";
  6. String sqsUrl = initSQS(sqsName);
  7. generateSourceData(sqsUrl, messageBody);
  8. this.defaultWorkflow = new DefaultWorkflow(this.actorSystem, sqsUrl);
  9. Source<ProvisioningResult, FlowMonitor<ProvisioningResult>> source =
  10. defaultWorkflow.getSource();
  11. final CompletionStage<List<ProvisioningResult>> future =
  12. source.take(1).runWith(Sink.seq(), materializer);
  13. final List<ProvisioningResult> result = future.toCompletableFuture().join();
  14. assertEquals(1, result.size());
  15. assertEquals(result.get(0).getBody(), messageBody);
  16. }
  17. public void generateSourceData(String queueUrl, String messageBody) {
  18. client
  19. .sendMessage(
  20. SendMessageRequest.builder().queueUrl(queueUrl).messageBody(messageBody).build())
  21. .join();
  22. }
  23. private void initClient() throws URISyntaxException {
  24. System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
  25. AwsCredentials credentials = AwsBasicCredentials.create("somekey", "somevalue");
  26. StaticCredentialsProvider provider = StaticCredentialsProvider.create(credentials);
  27. client =
  28. SqsAsyncClient.builder()
  29. .region(Region.of(Localstack.getDefaultRegion()))
  30. .httpClient(AkkaHttpClient.builder().withActorSystem(ActorSystem.create()).build())
  31. .credentialsProvider(provider)
  32. .endpointOverride(new URI(Localstack.INSTANCE.getEndpointSQS()))
  33. .build();
  34. }
  35. protected String initSQS(String queueName) throws URISyntaxException {
  36. initClient();
  37. client
  38. .createQueue(CreateQueueRequest.builder().queueName(queueName).build())
  39. .join()
  40. .queueUrl();
  41. GetQueueUrlResponse response = client.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()).join();
  42. System.out.println("Using queue " + response);
  43. log.info("Using queue {}", response);
  44. return response.queueUrl();
  45. }

当我启用调试时,我看到这个错误会永远重复:
[info][debug]s.a.a.c.i.executioninterceptorchain-创建一个拦截器链,该链将按以下顺序应用拦截器:[software.amazon.awssdk.awscore.interceptor。helpfulunknownhostexceptioninterceptor@41a58812,software.amazon.awssdk.services.sqs.internal。messagemd5checksuminterceptor@4f626e56,software.amazon.awssdk.protocols.query.interceptor。queryparameterstobodyinterceptor@4b5de362][信息][调试]s.a.a.c.i.executioninterceptorchain-interceptor'software.amazon.awssdk.protocols.query.interceptor。queryparameterstobodyinterceptor@4b5de362'使用modifyhttprequest方法修改了消息[info][debug]s.a.a.request-发送请求:defaultsdkhttpfullrequest(httpmethod=post,protocol=http,host=localhost,port=4566,encodedpath=,headers=[amz sdk invocation id,content length,content type,user agent],queryparameters=[])[info][debug]s.a.a.s.aws4signer-aws4要签名的字符串:aws4-hmac-sha256[info]20201230t233655z[info]20201230/us-east-1/sqs/aws4\u request[info]5e95b0e6707472e57434cb0da9516ef2ef4c747760bda87e9207161f9834d6dc01[info][debug]s.a.request-收到的错误响应:500[info][debug]s.a.a.request-可重试的错误检测。将在47ms后重试。请求尝试次数2[info][debug]s.a.a.request-重试请求:defaultsdkhttpfullrequest(httpmethod=post,protocol=http,host=localhost,port=4566,encodedpath=,headers=[amz sdk调用id,内容长度,内容类型,用户代理],queryparameters=[])[info][debug]s.a.a.s.aws4signer-aws4要签名的字符串:aws4-hmac-sha256[info]20201230t233655z[info]20201230/us-east-1/sqs/aws4\u request[info]ebce46865d8fface363b83481672fa3a7a11f584f2ea7c1e2b56e3813afc[info][debug]s.a.request-收到的错误响应:500[info][debug]s.a.a.request-可重试的错误检测。将在51毫秒后重试。请求尝试次数3[info][debug]s.a.a.request-重试请求:defaultsdkhttpfullrequest(httpmethod=post,protocol=http,host=localhost,port=4566,encodedpath=,headers=[amz sdk调用id,内容长度,内容类型,用户代理],queryparameters=[])[info][debug]s.a.a.s.aws4signer-aws4要签名的字符串:aws4-hmac-sha256[info]20201230t233655z[info]20201230/us-east-1/sqs/aws4\u request[info]4085d30b4fa9cce89c435ed7a640539a665bc2bfd4322fafd4095d2cb58fab1[info][debug]s.a.request-收到的错误响应:500[info][debug]s.a.a.request-可重试的错误检测。将在230ms后重试。请求尝试次数4[info][debug]s.a.a.request-重试请求:defaultsdkhttpfullrequest(httpmethod=post,protocol=http,host=localhost,port=4566,encodedpath=,headers=[amz sdk调用id,内容长度,内容类型,用户代理],queryparameters=[])[info][debug]s.a.a.s.aws4signer-aws4要签名的字符串:aws4-hmac-sha256[info]20201230t233655z[info]20201230/us-east-1/sqs/aws4\u request[info]618037af1ce6008ce721fd3ca9cddde2e3f2893d50bd9ff5a241cc2061e1d67f[info][debug]s.a.request-收到的错误响应:500[info][warn]a.s.restartwackoffsource-由于故障而重新启动图形。堆栈跟踪:[info]java.util.concurrent.completionexception:software.amazon.awssdk.services.sqs.model.sqseexception:null(服务:sqs,状态代码:500,请求id:null)[info]位于software.amazon.awssdk.utils.completablefutureutils.errorascompletionexception(completablefutureutils)。java:60)[信息]位于software.amazon.awssdk.core.internal.http.pipeline.stages.asyncexecutionfailureexceptionreportingstage.lambda$execute$0(asyncexecutionfailureexceptionreportingstage)。java:51)[信息]在java.base/java.util.concurrent.completablefuture.unihandle(completablefuture。java:930)[信息]位于java.base/java.util.concurrent.completablefuture$unihandle.tryfire(completablefuture。java:907)[信息]位于java.base/java.util.concurrent.completablefuture.postcomplete(completablefuture。java:506)[信息]在java.base/java.util.concurrent.completablefuture.completeexceptionally(completablefuture。java:2088)[信息]位于software.amazon.awssdk.utils.completablefutureutils.lambda$forwardexception$0(completablefutureutils)。java:74)[信息]位于java.base/java.util.concurrent.completablefuture.uniwhencomplete(completablefuture。java:859)[信息]在java.base/java.util.concurrent.completablefuture$uniwhencomplete.tryfire(completablefuture)。java:837)[信息]位于java.base/java.util.concurrent.completablefuture.postcomplete(completablefuture。java:506)[信息]位于java.base/java.util.concurrent.completablefuture.completeexceptional(completablefuture。java:2088)[信息]在software.amazon.awssdk.core.internal.http.pipeline.stages.asyncretryablestage$retryingexecutor.maybeatTestExecute(asyncretryablestage)。java:85)[信息]位于software.amazon.awssdk.core.internal.http.pipeline.stages.asyncretryablestage$retryingexecutor.mayberetryexecute(asyncretryablestage)。java:144)[信息]在software.amazon.awssdk.core.internal.http.pipeline.stages.asyncretryablestage$retryingexecutor.lambda$attemptexecute$1(asyncretryablestage)。java:133)[信息]位于java.base/java.util.concurrent.completablefuture.uniwhencomplete(completablefuture。java:859)[信息]在java.base/java.util.concurrent.completablefuture$uniwhencomplete.tryfire(completablefuture)。java:837)[信息]位于java.base/java.util.concurrent.completablefuture.postcomplete(completablefuture。java:506)[信息]位于java.base/java.util.concurrent.completablefuture.complete(completablefuture。java:2073)[信息]在software.amazon.awssdk.core.internal.http.pipeline.stages.makeasynchttprequeststage.lambda$executehttprequest$1(makeasynchttprequeststage)。java:167)[信息]位于java.base/java.util.concurrent.completablefuture.uniwhencomplete(completablefuture。java:859)[信息]在java.base/java.util.concurrent.completablefuture$uniwhencomplete.tryfire(completablefuture)。java:837)[信息]位于java.base/java.util.concurrent.completablefuture$completion.run(completablefuture。java:478)[信息]位于java.base/java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1128)[信息]在java.base/java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:628)[信息]位于java.base/java.lang.thread.run(thread。java:834)[信息]原因:software.amazon.awssdk.services.sqs.model.sqsexception:null(服务:sqs,状态代码:500,请求id:null)[info]位于software.amazon.awssdk.services.sqs.model.sqsexception$builderimpl.build(sqsexception)。java:95)[信息]在software.amazon.awssdk.services.sqs.model.sqsexception$builderimpl.build(sqsexception。java:55)[信息]在software.amazon.awssdk.protocols.query.internal.unmarshall.awsxmlerorrunmarshaller.unmarshall(awsxmlerorrunmarshaller)。java:97)[信息]在software.amazon.awssdk.protocols.query.unmarshall.awsxmlerrotocolunmarshaller.handle(awsxmlerrotocolunmarshaller。java:102)[信息]在software.amazon.awssdk.protocols.query.unmarshall.awsxmlerrotocolunmarshaller.handle(awsxmlerrotocolunmarshaller)。java:82)[信息]位于software.amazon.awssdk.core.internal.http.async.asyncresponsehandler.lambda$prepare$0(asyncresponsehandler)。java:88)[信息]在java.base/java.util.concurrent.completablefuture$unicompose.tryfire(completablefuture)。java:1072)[信息]位于java.base/java.util.concurrent.completablefuture.postcomplete(completablefuture。java:506)[信息]位于java.base/java.util.concurrent.completablefuture.complete(completablefuture。java:2073)[信息]在software.amazon.awssdk.core.internal.http.async.asyncresponsehandler$subscriber.oncomplete(异步响应句柄)。java:129)[信息]在akka.stream.impl.reactivestreamscompliance$.tryoncomplete(reactivestreamscompliance。scala:114)[信息]在akka.stream.impl.fusing.actorgraphinterpreter$actorroutputboundary.complete(actorgraphinterpreter。scala:390) [信息]在akka.stream.impl.fusing.actorgraphinterpreter$actorroutputboundary.onupstreamfinish(actorgraphinterpreter。scala:416)[信息]位于akka.stream.impl.fusing.graphinterpeter.processevent(graphinterpeter。scala:523)[信息]在akka.stream.impl.fusing.graphinterpeter.execute(graphinterpeter。scala:390)[信息]在akka.stream.impl.fusing.graphinterpetershell.runbatch(actorgraphinterpreter。scala:625)[信息]位于akka.stream.impl.fusing.graphinterpetershell$asyncinput.execute(actorgraphinterpreter)。scala:502)[信息]位于akka.stream.impl.fusing.graphinterperetershell.processevent(actorgraphinterpreter。scala:600)[信息]在akka.stream.impl.fusing.actorgraphinterpreter.akka$stream$impl$fusing$actorgraphinterpreter$$processevent(actorgraphinterpreter)。scala:769)[信息]在akka.stream.impl.fusing.actorgraphinterpreter$$anonfun$接收$1.applyorelse(actorgraphinterpreter)。scala:784)[信息]在akka.actor.actor.aroundreceive(actor。scala:537)[信息]在akka.actor.actor.aroundreceive$(actor。scala:535)[信息]在akka.stream.impl.fusing.actorgraphinterpreter.aroundreceive(actorgraphinterpreter。scala:691)[信息]在akka.actor.actorcell.receivemessage(actorcell。scala:577)[信息]在akka.actor.actorcell.invoke(actorcell。scala:547)[信息]在akka.dispatch.mailbox.processmailbox(mailbox。scala:270) [信息]在akka.dispatch.mailbox.run(mailbox。scala:231)[信息]在akka.dispatch.mailbox.exec(mailbox。scala:243)[信息]位于java.base/java.util.concurrent.forkjointask.doexec(forkjointask。java:290)[信息]位于java.base/java.util.concurrent.forkjoinpool$workqueue.toplevelexec(forkjoinpool)。java:1020)[信息]在java.base/java.util.concurrent.forkjoinpool.scan(forkjoinpool。java:1656)[信息]位于java.base/java.util.concurrent.forkjoinpool.runworker(forkjoinpool。java:1594)[信息]位于java.base/java.util.concurrent.forkjoinworkerthread.run(forkjoinworke

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题