我有Kafka生产者发布消息初始化连接每次。我厌倦了在服务器启动时创建连接,而在服务器厌倦了发布消息时使用相同的连接—因为消息流以十亿计。在某个时间点之后,由于与kafka代理的许多开放连接,服务器将关闭。下面是服务器上使用的代码段。你知道如何在服务器启动期间为producer创建kafka连接,并在需要发布消息时继续使用该连接,或者创建一些“x”连接,并继续使用“x”连接中的连接吗
class KafkaProduce{
private var kafkaProducer:KafkaProducer = new KafkaProduer[String,String](getProeperties)
@volatile private var isClosed = false
def close:Unit={
if(!isClosed){
producer.close
isClosed=true
}
def instance():KafkaProducer[String,String]={
this.synchronized{
if(!isClosed) producer
else{producer = new KafkaProducer[String,String](getProperties)}
}
}
producer
}
def getProperties:Properties = {
val properties = new Properties()
properties.put("bootstrap.servers","")
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
properties.put("acks",0.toString)
properties.put("producer.type","sync")
}
def writeMessagetoKafka(message:String):Unit{
try{
val sendProducer = instance
val record = new ProducerRecord[String,String](topic,message)
sendProducer.send(record)
}
catch{
case ee:Exception =>
val eWriter = new StringWriter
ee.printStackTrace(new PrintWriter(eWriter))
}
}
}
抽象类中调用上述生产者类的函数
public abstract Calling{
/* some methods*/
private void sendMessage(){
KafkaProduce producer = new KafkaProduce();
producer.writeMessagetoKafka("hello");
}
}
kafka客户端版本:2.4.1 scala版本:2.12.6服务器:weblogic 12c在我们组织的服务器上没有使用weblogic osb服务的选项
暂无答案!
目前还没有任何答案,快来回答吧!