Kafka连接工厂-生产商

ct2axkht  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(222)

我有Kafka生产者发布消息初始化连接每次。我厌倦了在服务器启动时创建连接,而在服务器厌倦了发布消息时使用相同的连接—因为消息流以十亿计。在某个时间点之后,由于与kafka代理的许多开放连接,服务器将关闭。下面是服务器上使用的代码段。你知道如何在服务器启动期间为producer创建kafka连接,并在需要发布消息时继续使用该连接,或者创建一些“x”连接,并继续使用“x”连接中的连接吗

class KafkaProduce{
private var kafkaProducer:KafkaProducer = new KafkaProduer[String,String](getProeperties)
@volatile private var isClosed = false

def close:Unit={
if(!isClosed){
producer.close
isClosed=true
}

def instance():KafkaProducer[String,String]={
this.synchronized{
if(!isClosed) producer
else{producer = new KafkaProducer[String,String](getProperties)}
}
}
producer
}
def getProperties:Properties = {
val properties = new Properties()
properties.put("bootstrap.servers","")
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
properties.put("acks",0.toString)
properties.put("producer.type","sync")
}
def writeMessagetoKafka(message:String):Unit{
try{
val sendProducer = instance
val record = new ProducerRecord[String,String](topic,message)
sendProducer.send(record)
}
catch{
case ee:Exception =>
val eWriter = new StringWriter
ee.printStackTrace(new PrintWriter(eWriter))
}
}
}

抽象类中调用上述生产者类的函数

public abstract Calling{
/* some methods*/
private void sendMessage(){
 KafkaProduce producer = new KafkaProduce();
 producer.writeMessagetoKafka("hello");
}

}

kafka客户端版本:2.4.1 scala版本:2.12.6服务器:weblogic 12c在我们组织的服务器上没有使用weblogic osb服务的选项

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题