当尝试通过zookeeper将kafka的iidr复制引擎连接到kafka群集时,出现以下错误
kafka.common.KafkaException: Failed to parse the broker info from zookeeper: {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT:
//broker:29092","PLAINTEXT_HOST://localhost:9092"],"jmx_port":9101,"host":"broker","timestamp":"1598174513950","port":29092,"version":4}
at kafka.cluster.Broker$.createBroker(Broker.scala:101)
at kafka.utils.ZkUtils.getBrokerInfo(ZkUtils.scala:787)
at kafka.utils.ZkUtils$$anonfun$getAllBrokersInCluster$2.apply(ZkUtils.scala:162)
at kafka.utils.ZkUtils$$anonfun$getAllBrokersInCluster$2.apply(ZkUtils.scala:162)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.utils.ZkUtils.getAllBrokersInCluster(ZkUtils.scala:162)
at com.datamirror.ts.target.publication.KafkaTargetPublisherProxy.getBootstrapBrokers(KafkaTargetPublisherProxy.java:829)
at com.datamirror.ts.target.publication.KafkaTargetPublisherProxy.loadKafkaServicesInfo(KafkaTargetPublisherProxy.java:417)
at com.datamirror.ts.target.publication.KafkaTargetPublisherProxy.handleStartReplicateMessage(KafkaTargetPublisherProxy.java:139)
at com.datamirror.ts.enginemsg.MessageDispatcher.dispatchSwitch(MessageDispatcher.java:547)
at com.datamirror.ts.enginemsg.MessageDispatcher.dispatch(MessageDispatcher.java:142)
at com.datamirror.ts.engine.ReplicationSession.dispatchDynamicMessage(ReplicationSession.java:2816)
at com.datamirror.ts.engine.ReplicationSession.dispatchDataMessage(ReplicationSession.java:2910)
at com.datamirror.ts.target.publication.TargetDataChannelJob.moderateForTheTarget(TargetDataChannelJob.java:178)
at com.datamirror.ts.target.publication.TargetDataChannelJob.execute(TargetDataChannelJob.java:74)
at com.datamirror.ts.engine.component.PipelineThread.runThread(PipelineThread.java:217)
at com.datamirror.ts.util.TsThread.run(TsThread.java:130)
Caused by: java.lang.IllegalArgumentException: No enum constant org.apache.kafka.common.protocol.SecurityProtocol.PLAINTEXT_HOST
at java.lang.Enum.valueOf(Enum.java:249)
at org.apache.kafka.common.protocol.SecurityProtocol.valueOf(SecurityProtocol.java:28)
at org.apache.kafka.common.protocol.SecurityProtocol.forName(SecurityProtocol.java:89)
at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49)
at kafka.cluster.Broker$$anonfun$1.apply(Broker.scala:90)
at kafka.cluster.Broker$$anonfun$1.apply(Broker.scala:89)
at scala.collection.immutable.List.map(List.scala:277)
at kafka.cluster.Broker$.createBroker(Broker.scala:89)
好像iidr和我的kafka集群之间存在兼容性问题。感谢您对如何克服这一问题的任何建议。
System setup
- IIDR v11.4 kafka engine setup
- Quick start confluent kafka docker setup, https://github.com/confluentinc/cp-all-in-one, cd cp-all-in-one
1条答案
按热度按时间ioekq8ef1#
对于kafka和idr产品,建议使用bootstrap.servers,并在kafkaproducer.properties和kafkaconsumer.properties文件中列出代理。如果需要继续,请联系l2了解zookeeper配置,但我们强烈建议按照apache kafka的建议使用bootstrap.servers参数。