如何在kafka connect api中设置max.poll.records

w8biq8rn  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(835)

我正在使用confluent-3.0.1平台并构建kafka elasticsearch连接器。为此,我扩展了sinkconnector和sinktask(kafka连接api)以从kafka获取数据。
作为这段代码的一部分,我扩展了sinkconnector的taskconfigs方法以返回“max.poll.records”,一次只获取100条记录。但它不工作,我在同一时间得到所有记录,我没有提交规定的时间内抵消。请任何人帮我配置“max.poll.records”

public List<Map<String, String>> taskConfigs(int maxTasks) {
    ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>();
    for (int i = 0; i < maxTasks; i++) {
      Map<String, String> config = new HashMap<String, String>();
      config.put(ConfigurationConstants.CLUSTER_NAME, clusterName);
      config.put(ConfigurationConstants.HOSTS, hosts);
      config.put(ConfigurationConstants.BULK_SIZE, bulkSize);
      config.put(ConfigurationConstants.IDS, elasticSearchIds);
      config.put(ConfigurationConstants.TOPICS_SATELLITE_DATA, topics);
      config.put(ConfigurationConstants.PUBLISH_TOPIC, topicTopublish);
      config.put(ConfigurationConstants.TYPES, elasticSearchTypes);
      config.put("max.poll.records", "100");

      configs.add(config);
    }
    return configs;
  }
bvuwiixz

bvuwiixz1#

你不能覆盖大多数Kafka消费者的配置 max.poll.records 在连接器配置中。不过,您可以在connect worker配置中使用 consumer. 前缀。

hgc7kmma

hgc7kmma2#

事情解决了。我在connect-avro-standalone.properties中添加了以下配置

group.id=mygroup
 consumer.max.poll.records=1000

在命令下运行我的连接器。

sh ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-elasticsearch/connect-elasticsearch-sink.properties

相关问题