apache flink-无法对flinkkinesisconsumer使用本地kinesis

hc8w905p  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(401)

到目前为止,我已经按照Flink的动觉连接器使用本地动觉记录的说明。

使用非aws运动端点进行测试

Properties producerConfig = new Properties();
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");

对于flink制作人,这些说明适用于本地的kinesis(我使用kinesalite)。
然而,对于flink消费者,我得到一个例外 aws.region 以及 aws.endpoint 两者都不允许。但是region是必需的,这意味着不可能覆盖端点。
org.apache.flink.client.program.programinvocationexception:main方法导致错误:对于flinkkinesisconsumer,必须在配置中设置aws region('aws.region')或aws endpoint('aws.endpoint')。
这是连接器中的错误吗?我看到一个相关的公关:https://github.com/apache/flink/pull/6045 .
我在flink的邮件列表上找到了一个解决方法,但是他们把这个描述为生产者的问题,而不是消费者的问题,而我看到了相反的情况(我想),所以对此不确定。真让人困惑。

1l5u6lss

1l5u6lss1#

自从提出这个问题以来,已经取得了一些进展。
询问者把这个问题推到第二次圣战中,被标记为第二次圣战的复制品。
这个问题现在应该已经解决了,并且1.10及更高版本的修复程序可用。

vhmi4jdf

vhmi4jdf2#

这个问题与 XOR 验证检查中的条件。如你所见 validateConsumerConfiguration 方法在if语句中执行异或验证。因此,只能指定选中的两个参数中的一个。

要设置自定义url,需要删除 AWSConfigConstants.AWS_REGION 属性并仅使用链接。

// Set the given URL
consumerConfig.put(AWSConfigConstants.AWS_ENDPOINT, URL);
// Remove the region
consumerConfig.remove(AWSConfigConstants.AWS_REGION);

此解决方案修复了与以下stacktrace相关的错误:

java.lang.IllegalArgumentException: For FlinkKinesisConsumer either AWS region ('aws.region') or AWS endpoint ('aws.endpoint') must be set in the config.

at org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.validateConsumerConfiguration(KinesisConfigUtil.java:92)

相关问题