我正在从cassandradb读取数据,并对其应用一些转换,然后通过.save()批处理方法将数据发送到kafka。我还使用Kafka生产者设置属性。但每次我都会收到以下错误:原因:org.apache.kafka.common.errors.timeoutexception:主题 60000毫秒后元数据中不存在。所有配置、凭据都已设置。同样的代码在本地运行良好,因为没有sasl机制,但是在集群上得到了上述异常。请帮忙。
System.setProperty("java.security.auth.login.config","/apps/xxxx/jaas.conf")
val props = new Properties()
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
props.put("acks","all")
props.put("bootstrap.servers","xxxxxx1:9095,xxxxxx2:9095,xxxxx3:9095")
props.put("ssl.truststore.location","/home/xxxxx/ocrptrust.jks")
props.put("ssl.truststore.password","xxxxxxxxx")
props.put("sasl.mechanism","SCRAM-SHA-512")
props.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";")
props.put("security.protocol","SASL_SSL")
val producerConfig = new KafkaProducer[String,String](props)
jsonRead.selectExpr("CAST(householdID AS STRING) AS key", "to_json(struct(*)) AS value")
.write.format("kafka")
.option("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
.option("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
.option("acks","all")
.option("ssl.truststore.location","/home/xxxxx/ocrptrust.jks")
.option("ssl.truststore.password","xxxxxx")
.option("sasl.mechanism","SCRAM-SHA-512")
.option("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";")
.option("security.protocol","SASL_SSL")
.option("kafka.bootstrap.servers","xxxxxx1:9095,xxxxxx2:9095,xxxxx3:9095")
.option("topic", "xxxxxxxxxxx").save()
暂无答案!
目前还没有任何答案,快来回答吧!