(aws emr发布标签5.28.x上的apache flink1.8)
我们的数据源是一个aws kinesis流(如果需要的话,有450个碎片)。我们使用flinkkinesis消费者来读取运动流。我们的应用程序偶尔(每两天一次)会因“目标服务器响应失败”错误而崩溃。完整堆栈跟踪位于底部。
进一步查看代码库,我发现“ProvisionedthroughputeExceedeDexception”是唯一可以重试的异常类型。代码
1.想知道为什么kinesis连接器不重试一个 transient http响应异常吗?
2.有没有一种方法可以传递一个重试配置来重试这些错误?
作为补充说明,我们设置了以下重试配置-
env.setRestartStrategy(RestartStrategies.failureRateRestart(12,
org.apache.flink.api.common.time.Time.of(60, TimeUnit.MINUTES),
org.apache.flink.api.common.time.Time.of(300, TimeUnit.SECONDS)));
异常的完整堆栈跟踪-
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1201)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1147)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1292)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1263)
at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:250)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:400)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2条答案
按热度按时间wj8zmpe11#
正在配置的重新启动策略
env.setRestartStrategy()
是关于在失败的情况下重新启动整个flink作业。它不会影响flink的动觉连接。kinesis使用者具有以下用于更改重启行为的配置设置(从1.11开始):
ldfqzlk82#
kinesproxy支持重试异常,并且可以使用上一个答案中提到的设置来控制重试行为。但是,并不是所有的异常都将被重试,并且默认的白名单并没有涵盖通常会在kinesis服务中发生的所有暂时性问题。我们定制了如下代理(随时间推移),以实现稳定的生产设置: