从apache beam(gcp数据流)写入confluentcloud

4smxwvx5  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(570)

我正在尝试从dataflow(apache beam)写入到confluent cloud/kafka,使用以下方法:

kafkaKnowledgeGraphKVRecords.apply("Write to Kafka", KafkaIO.<String, String>write()
                                .withBootstrapServers("<mybootstrapserver>.confluent.cloud:9092")
                                .withTopic("testtopic").withKeySerializer(StringSerializer.class)
                                .withProducerConfigUpdates(props).withValueSerializer(StringSerializer.class));

哪里 Map<String, Object> props = new HashMap<>(); (即暂时清空)
在日志中,我得到: send failed : 'Topic testtopic not present in metadata after 60000 ms.' 这个主题确实存在于这个集群中——所以我猜登录有问题,这是有道理的,因为我找不到传递apikey的方法。
我确实尝试了各种组合来将我从合流云中得到的apikey/秘密传递给auth props 但我找不到一个工作设置。

jgwigjjp

jgwigjjp1#

找到了一个解决方案,多亏了问题下方@robinmoffatt评论中的指针
以下是我现在的设置:

Map<String, Object> props = new HashMap<>()

props.put("ssl.endpoint.identification.algorithm", "https");
props.put("sasl.mechanism", "PLAIN");
props.put("request.timeout.ms", 20000);
props.put("retry.backoff.ms", 500);
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<APIKEY>\" password=\"<SECRET>";");
props.put("security.protocol", "SASL_SSL");

kafkaKnowledgeGraphKVRecords.apply("Write to Kafka-TESTTOPIC", KafkaIO.<String, String>write()
    .withBootstrapServers("<CLUSTER>.confluent.cloud:9092")
    .withTopic("test").withKeySerializer(StringSerializer.class)
    .withProducerConfigUpdates(props).withValueSerializer(StringSerializer.class));

我说错的关键是 sasl.jaas.config (注意 ; 最后!)

相关问题