我尝试用一个sqs源代码和一个localstack测试来实现一个简单的工作流。如果我再加上 SqsAckSink
,也不适用于 SqsAckFlow
. 但如果我移除 SqsAckSink
就用它吧 Sink.seq()
,则测试通过。拥有 SqsAckSink
或者 SqsAckFlow
让考试永远悬着。我也在测试中启用了debug,看到同样的错误一次又一次地重复,使得图形重新启动,但这对我来说没有多大意义。我将在下面的代码片段之后发布错误消息。
代码是:
public class DefaultWorkflow implements Workflow {
private Function<Throwable, Supervision.Directive> errorStrategy;
private final ActorSystem actorSystem;
private FlowMonitor<ProvisioningResult> workflowMonitor;
private final String queueUrl;
private final SqsAsyncClient asyncClient;
@Inject
public DefaultWorkflow(ActorSystem actorSystem, String queueUrl) {
this.errorStrategy = exc -> (Supervision.Directive) Supervision.resume();
this.actorSystem = actorSystem;
this.queueUrl = queueUrl;
asyncClient =
SqsAsyncClient.builder()
.region(Region.of(Localstack.getDefaultRegion()))
.httpClient(AkkaHttpClient.builder().withActorSystem(actorSystem).build())
.endpointOverride(new URI(queueUrl))
.build();
doWork();
}
private Flow<Message, ProvisioningResult, NotUsed> buildFlow() {
return Flow.of(Message.class)
.via(Flow.of(Message.class).map(m -> ProvisioningResult.builder().body(m.body()).build()));
}
@Override
public Source<ProvisioningResult, FlowMonitor<ProvisioningResult>> getSource() {
Source<Message, NotUsed> sqsSource =
RestartSource.onFailuresWithBackoff(
Duration.ofSeconds(1), Duration.ofSeconds(2), 0.1, this::createSQSSource);
return sqsSource
.via(buildFlow())
.withAttributes(ActorAttributes.withSupervisionStrategy(errorStrategy))
.monitorMat(Keep.right());
}
private Source<Message, NotUsed> createSQSSource() {
SqsSourceSettings sqsSourceSettings = SqsSourceSettings.create().withMaxBatchSize(1);
return SqsSource.create(queueUrl, sqsSourceSettings, asyncClient);
}
@Override
public FlowMonitor<ProvisioningResult> getWorkflowMonitor() {
return this.workflowMonitor;
}
private void doWork() {
Pair<FlowMonitor<ProvisioningResult>, CompletionStage<Done>> run =
getSource()
.toMat(SqsAckSink.create(queueUrl, SqsAckSettings.create(), asyncClient), Keep.both())
.run(actorSystem);
workflowMonitor = run.first();
}
}
测试如下所示:
@Test
public void getSource_givenMessage_shouldProduceResult()
throws InterruptedException, ExecutionException, TimeoutException, URISyntaxException {
String sqsName = "sqs2";
String messageBody = "someMessage";
String sqsUrl = initSQS(sqsName);
generateSourceData(sqsUrl, messageBody);
this.defaultWorkflow = new DefaultWorkflow(this.actorSystem, sqsUrl);
Source<ProvisioningResult, FlowMonitor<ProvisioningResult>> source =
defaultWorkflow.getSource();
final CompletionStage<List<ProvisioningResult>> future =
source.take(1).runWith(Sink.seq(), materializer);
final List<ProvisioningResult> result = future.toCompletableFuture().join();
assertEquals(1, result.size());
assertEquals(result.get(0).getBody(), messageBody);
}
public void generateSourceData(String queueUrl, String messageBody) {
client
.sendMessage(
SendMessageRequest.builder().queueUrl(queueUrl).messageBody(messageBody).build())
.join();
}
private void initClient() throws URISyntaxException {
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
AwsCredentials credentials = AwsBasicCredentials.create("somekey", "somevalue");
StaticCredentialsProvider provider = StaticCredentialsProvider.create(credentials);
client =
SqsAsyncClient.builder()
.region(Region.of(Localstack.getDefaultRegion()))
.httpClient(AkkaHttpClient.builder().withActorSystem(ActorSystem.create()).build())
.credentialsProvider(provider)
.endpointOverride(new URI(Localstack.INSTANCE.getEndpointSQS()))
.build();
}
protected String initSQS(String queueName) throws URISyntaxException {
initClient();
client
.createQueue(CreateQueueRequest.builder().queueName(queueName).build())
.join()
.queueUrl();
GetQueueUrlResponse response = client.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()).join();
System.out.println("Using queue " + response);
log.info("Using queue {}", response);
return response.queueUrl();
}
当我启用调试时,我看到这个错误会永远重复:
[info][debug]s.a.a.c.i.executioninterceptorchain-创建一个拦截器链,该链将按以下顺序应用拦截器:[software.amazon.awssdk.awscore.interceptor。helpfulunknownhostexceptioninterceptor@41a58812,software.amazon.awssdk.services.sqs.internal。messagemd5checksuminterceptor@4f626e56,software.amazon.awssdk.protocols.query.interceptor。queryparameterstobodyinterceptor@4b5de362][信息][调试]s.a.a.c.i.executioninterceptorchain-interceptor'software.amazon.awssdk.protocols.query.interceptor。queryparameterstobodyinterceptor@4b5de362'使用modifyhttprequest方法修改了消息[info][debug]s.a.a.request-发送请求:defaultsdkhttpfullrequest(httpmethod=post,protocol=http,host=localhost,port=4566,encodedpath=,headers=[amz sdk invocation id,content length,content type,user agent],queryparameters=[])[info][debug]s.a.a.s.aws4signer-aws4要签名的字符串:aws4-hmac-sha256[info]20201230t233655z[info]20201230/us-east-1/sqs/aws4\u request[info]5e95b0e6707472e57434cb0da9516ef2ef4c747760bda87e9207161f9834d6dc01[info][debug]s.a.request-收到的错误响应:500[info][debug]s.a.a.request-可重试的错误检测。将在47ms后重试。请求尝试次数2[info][debug]s.a.a.request-重试请求:defaultsdkhttpfullrequest(httpmethod=post,protocol=http,host=localhost,port=4566,encodedpath=,headers=[amz sdk调用id,内容长度,内容类型,用户代理],queryparameters=[])[info][debug]s.a.a.s.aws4signer-aws4要签名的字符串:aws4-hmac-sha256[info]20201230t233655z[info]20201230/us-east-1/sqs/aws4\u request[info]ebce46865d8fface363b83481672fa3a7a11f584f2ea7c1e2b56e3813afc[info][debug]s.a.request-收到的错误响应:500[info][debug]s.a.a.request-可重试的错误检测。将在51毫秒后重试。请求尝试次数3[info][debug]s.a.a.request-重试请求:defaultsdkhttpfullrequest(httpmethod=post,protocol=http,host=localhost,port=4566,encodedpath=,headers=[amz sdk调用id,内容长度,内容类型,用户代理],queryparameters=[])[info][debug]s.a.a.s.aws4signer-aws4要签名的字符串:aws4-hmac-sha256[info]20201230t233655z[info]20201230/us-east-1/sqs/aws4\u request[info]4085d30b4fa9cce89c435ed7a640539a665bc2bfd4322fafd4095d2cb58fab1[info][debug]s.a.request-收到的错误响应:500[info][debug]s.a.a.request-可重试的错误检测。将在230ms后重试。请求尝试次数4[info][debug]s.a.a.request-重试请求:defaultsdkhttpfullrequest(httpmethod=post,protocol=http,host=localhost,port=4566,encodedpath=,headers=[amz sdk调用id,内容长度,内容类型,用户代理],queryparameters=[])[info][debug]s.a.a.s.aws4signer-aws4要签名的字符串:aws4-hmac-sha256[info]20201230t233655z[info]20201230/us-east-1/sqs/aws4\u request[info]618037af1ce6008ce721fd3ca9cddde2e3f2893d50bd9ff5a241cc2061e1d67f[info][debug]s.a.request-收到的错误响应:500[info][warn]a.s.restartwackoffsource-由于故障而重新启动图形。堆栈跟踪:[info]java.util.concurrent.completionexception:software.amazon.awssdk.services.sqs.model.sqseexception:null(服务:sqs,状态代码:500,请求id:null)[info]位于software.amazon.awssdk.utils.completablefutureutils.errorascompletionexception(completablefutureutils)。java:60)[信息]位于software.amazon.awssdk.core.internal.http.pipeline.stages.asyncexecutionfailureexceptionreportingstage.lambda$execute$0(asyncexecutionfailureexceptionreportingstage)。java:51)[信息]在java.base/java.util.concurrent.completablefuture.unihandle(completablefuture。java:930)[信息]位于java.base/java.util.concurrent.completablefuture$unihandle.tryfire(completablefuture。java:907)[信息]位于java.base/java.util.concurrent.completablefuture.postcomplete(completablefuture。java:506)[信息]在java.base/java.util.concurrent.completablefuture.completeexceptionally(completablefuture。java:2088)[信息]位于software.amazon.awssdk.utils.completablefutureutils.lambda$forwardexception$0(completablefutureutils)。java:74)[信息]位于java.base/java.util.concurrent.completablefuture.uniwhencomplete(completablefuture。java:859)[信息]在java.base/java.util.concurrent.completablefuture$uniwhencomplete.tryfire(completablefuture)。java:837)[信息]位于java.base/java.util.concurrent.completablefuture.postcomplete(completablefuture。java:506)[信息]位于java.base/java.util.concurrent.completablefuture.completeexceptional(completablefuture。java:2088)[信息]在software.amazon.awssdk.core.internal.http.pipeline.stages.asyncretryablestage$retryingexecutor.maybeatTestExecute(asyncretryablestage)。java:85)[信息]位于software.amazon.awssdk.core.internal.http.pipeline.stages.asyncretryablestage$retryingexecutor.mayberetryexecute(asyncretryablestage)。java:144)[信息]在software.amazon.awssdk.core.internal.http.pipeline.stages.asyncretryablestage$retryingexecutor.lambda$attemptexecute$1(asyncretryablestage)。java:133)[信息]位于java.base/java.util.concurrent.completablefuture.uniwhencomplete(completablefuture。java:859)[信息]在java.base/java.util.concurrent.completablefuture$uniwhencomplete.tryfire(completablefuture)。java:837)[信息]位于java.base/java.util.concurrent.completablefuture.postcomplete(completablefuture。java:506)[信息]位于java.base/java.util.concurrent.completablefuture.complete(completablefuture。java:2073)[信息]在software.amazon.awssdk.core.internal.http.pipeline.stages.makeasynchttprequeststage.lambda$executehttprequest$1(makeasynchttprequeststage)。java:167)[信息]位于java.base/java.util.concurrent.completablefuture.uniwhencomplete(completablefuture。java:859)[信息]在java.base/java.util.concurrent.completablefuture$uniwhencomplete.tryfire(completablefuture)。java:837)[信息]位于java.base/java.util.concurrent.completablefuture$completion.run(completablefuture。java:478)[信息]位于java.base/java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1128)[信息]在java.base/java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:628)[信息]位于java.base/java.lang.thread.run(thread。java:834)[信息]原因:software.amazon.awssdk.services.sqs.model.sqsexception:null(服务:sqs,状态代码:500,请求id:null)[info]位于software.amazon.awssdk.services.sqs.model.sqsexception$builderimpl.build(sqsexception)。java:95)[信息]在software.amazon.awssdk.services.sqs.model.sqsexception$builderimpl.build(sqsexception。java:55)[信息]在software.amazon.awssdk.protocols.query.internal.unmarshall.awsxmlerorrunmarshaller.unmarshall(awsxmlerorrunmarshaller)。java:97)[信息]在software.amazon.awssdk.protocols.query.unmarshall.awsxmlerrotocolunmarshaller.handle(awsxmlerrotocolunmarshaller。java:102)[信息]在software.amazon.awssdk.protocols.query.unmarshall.awsxmlerrotocolunmarshaller.handle(awsxmlerrotocolunmarshaller)。java:82)[信息]位于software.amazon.awssdk.core.internal.http.async.asyncresponsehandler.lambda$prepare$0(asyncresponsehandler)。java:88)[信息]在java.base/java.util.concurrent.completablefuture$unicompose.tryfire(completablefuture)。java:1072)[信息]位于java.base/java.util.concurrent.completablefuture.postcomplete(completablefuture。java:506)[信息]位于java.base/java.util.concurrent.completablefuture.complete(completablefuture。java:2073)[信息]在software.amazon.awssdk.core.internal.http.async.asyncresponsehandler$subscriber.oncomplete(异步响应句柄)。java:129)[信息]在akka.stream.impl.reactivestreamscompliance$.tryoncomplete(reactivestreamscompliance。scala:114)[信息]在akka.stream.impl.fusing.actorgraphinterpreter$actorroutputboundary.complete(actorgraphinterpreter。scala:390) [信息]在akka.stream.impl.fusing.actorgraphinterpreter$actorroutputboundary.onupstreamfinish(actorgraphinterpreter。scala:416)[信息]位于akka.stream.impl.fusing.graphinterpeter.processevent(graphinterpeter。scala:523)[信息]在akka.stream.impl.fusing.graphinterpeter.execute(graphinterpeter。scala:390)[信息]在akka.stream.impl.fusing.graphinterpetershell.runbatch(actorgraphinterpreter。scala:625)[信息]位于akka.stream.impl.fusing.graphinterpetershell$asyncinput.execute(actorgraphinterpreter)。scala:502)[信息]位于akka.stream.impl.fusing.graphinterperetershell.processevent(actorgraphinterpreter。scala:600)[信息]在akka.stream.impl.fusing.actorgraphinterpreter.akka$stream$impl$fusing$actorgraphinterpreter$$processevent(actorgraphinterpreter)。scala:769)[信息]在akka.stream.impl.fusing.actorgraphinterpreter$$anonfun$接收$1.applyorelse(actorgraphinterpreter)。scala:784)[信息]在akka.actor.actor.aroundreceive(actor。scala:537)[信息]在akka.actor.actor.aroundreceive$(actor。scala:535)[信息]在akka.stream.impl.fusing.actorgraphinterpreter.aroundreceive(actorgraphinterpreter。scala:691)[信息]在akka.actor.actorcell.receivemessage(actorcell。scala:577)[信息]在akka.actor.actorcell.invoke(actorcell。scala:547)[信息]在akka.dispatch.mailbox.processmailbox(mailbox。scala:270) [信息]在akka.dispatch.mailbox.run(mailbox。scala:231)[信息]在akka.dispatch.mailbox.exec(mailbox。scala:243)[信息]位于java.base/java.util.concurrent.forkjointask.doexec(forkjointask。java:290)[信息]位于java.base/java.util.concurrent.forkjoinpool$workqueue.toplevelexec(forkjoinpool)。java:1020)[信息]在java.base/java.util.concurrent.forkjoinpool.scan(forkjoinpool。java:1656)[信息]位于java.base/java.util.concurrent.forkjoinpool.runworker(forkjoinpool。java:1594)[信息]位于java.base/java.util.concurrent.forkjoinworkerthread.run(forkjoinworke
暂无答案!
目前还没有任何答案,快来回答吧!