从java创建“kafkaserver”

wvmv3b1j  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(285)

我正在尝试从java启动kafka服务器
具体来说,如何将这一行scala转换成一行java?

private val server = new KafkaServer(serverConfig, kafkaMetricsReporters = reporters)

我可以轻松地创建serverconfig,但似乎无法创建 kafkaMetricsReporters 参数。
注意:我可以创建 KafkaServerStartable 但我想创造一个正常的 KafkaServer 以避免jvm在出现错误时退出。
apache kafka版本0.11.0.1

uoifb46i

uoifb46i1#

这个 kafkaMetricsReporters 参数是scala Seq .
您可以:
创建java集合并将其转换为序列:
你需要导入 scala.collection.JavaConverters :

List<KafkaMetricsReporter> reportersList = new ArrayList<>();
...
Seq<KafkaMetricsReporter> reportersSeq = JavaConverters.asScalaBufferConverter(reportersList).asScala();

使用 KafkaMetricsReporter.startReporters() 方法从配置中为您创建它们:
作为 KafkaMetricsReporter 是独生子,你需要使用 MODULE 使用符号:

Seq<KafkaMetricsReporter> reporters = KafkaMetricsReporter$.MODULE$.startReporters(new VerifiableProperties(props));

还有 KafkaServer 构造函数还有另外两个参数,从java调用时需要这些参数: time 可以使用
new org.apache.kafka.common.utils.SystemTime() threadNamePrefix 是一种选择。如果你导入 scala.Option ,您可以打电话 Option.apply("prefix") 综合起来:

Properties props = new Properties();
props.put(...);
KafkaConfig config = KafkaConfig.fromProps(props);
Seq<KafkaMetricsReporter> reporters = KafkaMetricsReporter$.MODULE$.startReporters(new VerifiableProperties(props));
KafkaServer server = new KafkaServer(config, new SystemTime(), Option.apply("prefix"), reporters);
server.startup();

相关问题