我们正在与MongoDB Kafka Connetor合作,在开源的ApacheKafka连接器上,将json数据从Mongo摄取到HDFS。我们有Kafka消费者,它读取Kafka中的数据更改并将它们写入HDFS文件。
我们希望将源连接器安排在不同时间的特定时间。
我们需要按预定日期触发Kafka消息。
nnt7mjpx1#
我们可以使用来自Confluent的源连接器的配置属性和定制轮询间隔来处理此场景
链接:
Https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/all-properties/#std-label-source-configuration-all-properties
==>poll.wait.time.ms可以是一个解决方案
否则,有Kafka消息调度器:
https://github.com/etf1/kafka-message-scheduler
使用排定程序自动使用Kafka中的数据
当您创建新的计划程序时,vkconfig脚本将执行以下步骤:
使用您为计划程序指定的名称创建新的Vertica架构。在配置过程中,您可以使用此名称来标识调度程序。
在新创建的架构中创建管理Kafka数据加载所需的表。
3bygqnnd2#
来自MongoDB Kafka Connect官方文档:
Https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/all-properties/#change-streams
使用以下配置设置可以为更改流指定聚合管道,并为更改流游标指定读取首选项。
Poll.wait.time.ms==在检查CHANGE STREAM游标是否有新结果之前等待的时间量(毫秒)。
或USE:poll.Max.Batch.size==轮询Change Stream游标以获取新数据时,单个批次要读入的最大文档数。您可以使用此设置来限制连接器内部缓冲的数据量。
2条答案
按热度按时间nnt7mjpx1#
我们可以使用来自Confluent的源连接器的配置属性和定制轮询间隔来处理此场景
链接:
Https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/all-properties/#std-label-source-configuration-all-properties
==>poll.wait.time.ms可以是一个解决方案
否则,有Kafka消息调度器:
https://github.com/etf1/kafka-message-scheduler
使用排定程序自动使用Kafka中的数据
当您创建新的计划程序时,vkconfig脚本将执行以下步骤:
使用您为计划程序指定的名称创建新的Vertica架构。在配置过程中,您可以使用此名称来标识调度程序。
在新创建的架构中创建管理Kafka数据加载所需的表。
3bygqnnd2#
来自MongoDB Kafka Connect官方文档:
Https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/all-properties/#change-streams
使用以下配置设置可以为更改流指定聚合管道,并为更改流游标指定读取首选项。
Poll.wait.time.ms==在检查CHANGE STREAM游标是否有新结果之前等待的时间量(毫秒)。
或USE:poll.Max.Batch.size==轮询Change Stream游标以获取新数据时,单个批次要读入的最大文档数。您可以使用此设置来限制连接器内部缓冲的数据量。