我使用kafka客户机库来创建一个kafka主题的使用者。kafka代理(producer)位于限制网络中,因此只有一些ip可以访问该服务。所以我需要在kafka客户机(consumer)上配置一个代理来连接。在consumerconfig中,可以在consumer构造函数中传递的没有代理条目。
创建使用者的方法:
public static Consumer<Long, String> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_BROKERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, IKafkaConstants.GROUP_ID_CONFIG);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, IKafkaConstants.MAX_POLL_RECORDS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, IKafkaConstants.OFFSET_RESET_EARLIER);
Consumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(IKafkaConstants.TOPIC_NAME));
return consumer;
}
如何为此使用者配置代理?
1条答案
按热度按时间1bqhqjot1#
Kafka的客户必须直接与经纪人交谈。
任何代理都需要知道每个单独的地址,因此无论如何都无法实现拥有单个负载平衡代理的目的。kafka协议处理自己的负载平衡和引导。
在这样的环境中,您可以说服网络团队让ssl kafka流量通过,或者安装kafka rest代理并通过http(s)发送流量