我想创建一个自定义Apache Flink Sink to AWS Sagemaker Feature store,但是Flink的网站上没有关于如何创建自定义sink的文档。还有多个基类可以扩展(例如AsyncSinkBase
,RichSinkFunction
),所以我不确定要使用哪个。
我正在寻找关于如何实现自定义接收器的指导方针(一般情况下和我的特定用例)。对于我的特定用例:Sagemaker Feature Store有一个同步客户端,它通过putRecord调用将记录发送到AWS Sagemaker FS,因此我正在寻找一种方法来创建一个可以很好地与此客户端配合使用的自定义接收器。我需要at least once
的处理保证,因为Sagemaker FS是DynamoDB(一个键-值存储)。
- Java客户端:https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/sagemakerfeaturestoreruntime/AmazonSageMakerFeatureStoreRuntime.html
- 使用Python客户端的putRecord调用示例:www.example.comhttps://github.com/aws-samples/amazon-sagemaker-feature-store-streaming-aggregation/blob/main/src/lambda/StreamingIngestAggFeatures/lambda_function.py#L31
"我迄今为止的发现"
- 一些旧的文章说使用
org.apache.flink.streaming.api.functions.sink.RichSinkFunction
和SinkFunction
- 某些连接器使用
org.apache.flink.connector.base.sink.writer
中的类(例如AsyncSinkWriter
、AsyncSinkBase
) - Flink文档的这一部分说明在创建自定义源代码时使用
org.apache.flink.connector.base.source.reader
中的SourceReaderBase
;SourceBaseReader似乎是上面项目符号中接收器类的等效源
任何帮助/指导/见解都非常感谢,谢谢。
1条答案
按热度按时间7gcisfzg1#
如何扩展RichAsyncFunction?
您可以在此处找到类似的示例- https:nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api