我的本地设置包括 local apache-flink
(通过brew安装)和 localstack
在kinesis服务运行的情况下。
我的 Docker
localstack:
image: localstack/localstack:0.10.7
environment:
- SERVICES=kinesis
ports:
- "4568:4568"
还有我的运动消费者:
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "123");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "123");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ENDPOINT, "http://localhost:4568");
但是当我运行flink程序时,我得到了一个错误:
原因:org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.amazonkinesexception:null(服务:amazonkinesis;状态代码:502;错误代码:空;请求id:空)
只有在使用 localstack
. 如果我用我的aws帐户连接到我的kinesis流,它会工作得很好。
3条答案
按热度按时间9lowa7mx1#
如果您使用的是java,那么可以使用
jar
一些amazon组件的库:首先,您需要在
pom.xml
为了能够在测试期间直接初始化localstack:然后,如果需要使用
kinesis
以及dynamo
,因为最新的一个aws
与最新版本的localstack不兼容:现在您可以使用以下注解来使用docker示例化堆栈,如果系统中不存在图像,则会自动提取图像。因此,不需要运行任何docker/docker compose映像。
现在,如果你需要初始化
DynamoDB
客户端可以使用以下行:现在,如果你需要初始化
Kinesis
客户端,您可以使用以下行:如果需要从kinesis(testpurpouse)读取数据,可以使用以下代码段作为模板(https://gist.github.com/alessiosavi/4ea88d73d6853de695843631207b7bc6):
oknwwptz2#
在添加flinkkinesisconsumer作为源之前,请添加以下行:
System.setProperty("com.amazonaws.sdk.disableCbor", "true") System.setProperty("org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor", "true")
这与导出env变量的效果相同,但将其放入代码中可以减少设置环境所花费的时间。whhtz7ly3#
我们需要通过env var禁用cbor和cert检查,并在同一控制台中启动flink