我想出了下面的类,它接受3个参数并返回消耗的Kafka消息。
package kafka
@Grab(group='org.apache.kafka', module='kafka-clients', version='2.8.0')
@Grab(group='org.slf4j', module='slf4j-simple', version='2.0.0')
import com.dell.techops.TechopsKafkaProducer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import java.time.Duration
import groovy.util.logging.Slf4j
class KConsumer {
def consumer
KConsumer(String NexusServers, String topic, String groupId) {
def props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
consumer = new KafkaConsumer<String, String>(props)
consumer.subscribe([topic])
}
List<String> consumeMessages() {
def messages = []
while (true) {
def records = consumer.poll(100)
if (!records.isEmpty()) {
records.each { record ->
messages.add(record.value())
}
return messages
}
}
}
void close() {
consumer.close()
}
}
使用下面的类,如(代码片段。)
def servers = "localhost:9092"
def topic = "my-topic"
def groupId = "my-group"
def consumer = new KConsumer(servers, topic, groupId)
messages = consumer.consumeMessages()
consumer.close()
一切正常,但是,我有兴趣传递一个名为key
的可选参数,如果通过,它应该只返回该特定键的消息。我有这样的逻辑。
def messages = []
def records = consumer.poll(100)
for (ConsumerRecord<String, String> record : records) {
if (record.key() == key) {
messages.add(record.value())
}
}
这里的问题是,我是否必须使类在默认情况下必须接受4个参数,或者在Groovy中是否有类似于Python的*args
选项。我想将key
参数作为一个可选参数处理。另外,我是否必须提出一个新的函数来满足这一点,或者是否可以重新使用consumeMessages
函数。
1条答案
按热度按时间ljsrvy3e1#
Groovy的基础知识是,如果调用方法时没有使用可选参数,可以定义一个可选参数并使用默认值填充它: