在多线程环境中使用kafka consumer编写测试

xlpyo6sf  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(368)

我试图在一个单独的线程中创建一个kafka使用者,该线程使用来自kafka主题的数据。为此,我延长了 ShutdownableThread 抽象类和提供的实现 doWork 方法。我的代码是这样的-

abstract class MyConsumer(topic: String) extends ShutdownableThread(topic) {
    val props: Properties = ???
    private val consumer = new KafkaConsumer[String, String](props)
    consumer.subscribe(List(topic).asJava)

    def process(value: String): Unit // Abstract method defining what to do with each record

    override def doWork(): Unit = {
        for (record <- consumer.poll(Duration.ofMillis(1000)).asScala)
            process(record.value())
    }
}

现在在我的测试中,我创建了 process() 方法,它只是对一个变量进行变异,然后调用 start() 启动线程的方法。

var mutVar = "initial_value"

val consumer = new MyConsumer("test_topic") {
    override def process(value: String): Unit = mutVar = "updated_value"
}

consumer.start()
assert(mutVar === "updated_value")

使用者确实使用了来自kafka的消息,但是在测试完成之前它没有更新它,因此测试失败。所以,我试着把主线放在睡眠上。但它抛出 ConcurrentModificationException 消息异常- KafkaConsumer is not safe for multi-threaded access 你知道我的方法有什么问题吗?提前谢谢。

sigwle7e

sigwle7e1#

必须将主线程休眠几秒钟,以允许使用者使用来自kafka主题的消息并将其存储在可变变量中。添加- Thread.sleep(5000) 启动耗电元件后。

相关问题