如何将数据从s3 bucket传输到kafka

h6my8fg2  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(777)

有关于将数据从kafka主题复制到s3的示例和文档,但是如何将数据从s3复制到kafka?

mwecs4sa

mwecs4sa1#

根据您的场景或上传对象所需的频率,您可以对每个事件(例如每次上传文件时)使用lambda函数,也可以将其作为cron。这个lambda作为生产者使用kafkaapi并发布到一个主题。
具体内容:
lambda函数的触发器可以是 s3:PutObject 直接来自s3或cloudwatch事件的事件。
如果不需要立即使用对象,可以将lambda作为cron运行。在这种情况下,另一种方法可能是在一个ec2示例上运行cron,该示例具有kafka生产者和从s3读取对象的权限,并不断将对象推送到kafka主题。

ukxgm1gy

ukxgm1gy2#

当你读一个s3对象时,你会得到一个字节流。你可以发送任何字节数组给Kafka ByteArraySerializer .
或者您可以将该inputstream解析为某个自定义对象,然后使用您可以配置的任何序列化程序发送该对象。
你可以在这里找到一个Kafka连接过程的例子(我假设你将其与confluent的s3连接编写器进行比较)-https://jobs.zalando.com/tech/blog/backing-up-kafka-zookeeper/index.html 可以配置为从s3读取二进制存档或行删除的文本。
类似地,apachespark、flink、beam、nifi等simlarhadoop相关工具也可以从s3读取事件,并将事件写入kafka。
这种方法的问题是,您需要跟踪到目前为止已读取的文件,以及处理部分读取的文件。

相关问题