python:如何为单元测试模拟kafka主题?

8wtpewkr  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(623)

我们有一个消息调度程序,它从消息属性生成一个散列密钥,然后将其放入带有该密钥的kafka主题队列中。
这样做是为了消除重复。但是,我不确定在不实际设置本地群集并检查其是否按预期运行的情况下如何测试此重复数据消除。
在网上搜索模仿Kafka主题队列的工具并没有起到任何作用,我担心我可能是想错了。
最终,无论使用什么来模拟kafka队列,其行为都应该与本地集群相同,即提供对主题队列的密钥插入的复制。
有这样的工具吗?

9w11ddsr

9w11ddsr1#

如果您需要验证特定于Kafka的功能,或者使用特定于Kafka的功能实现,那么唯一的方法就是使用Kafka!
Kafka是否对其重复数据消除逻辑进行了测试?如果是这样的话,下面的组合可能足以减轻您的组织所感知到的失败风险:
哈希逻辑的单元测试(确保相同的对象确实生成相同的哈希)
kafka主题重复数据消除测试(kafka项目内部)
飞行前烟雾测试验证你的应用程序与Kafka的集成
如果kafka没有围绕其主题进行任何类型的重复数据消除测试,或者您担心破坏更改,那么围绕kafka特定功能进行自动检查就很重要。这可以通过集成测试来完成。我最近在基于docker的集成测试管道方面取得了很大成功。在创建kafka docker映像的初始工作(社区可能已经提供了一个映像)之后,设置集成测试管道就变得很简单了。管道可能看起来像:
执行基于应用程序的单元测试(哈希逻辑)
一旦这些通过,您的ci服务器就会启动kafka
执行集成测试,验证重复写入只向主题发出一条消息。
我认为重要的是要确保Kafka集成测试最小化,只包括完全依赖Kafka特定功能的测试。即使使用docker compose,它们也可能比单元测试慢几个数量级,1毫秒对1秒钟?另一个需要考虑的问题是,维护集成管道的开销可能值得冒着相信kakfa将提供其声称的主题重复数据消除的风险。

mf98qq94

mf98qq942#

为了用sbt测试任务模拟kafka uder python单元测试,我做了如下工作。应安装Pypark。
在build.sbt中定义应与测试一起运行的任务:

val testPythonTask = TaskKey[Unit]("testPython", "Run python tests.")

val command = "python3 -m unittest app_test.py"
val workingDirectory = new File("./project/src/main/python")

testPythonTask := {
  val s: TaskStreams = streams.value
  s.log.info("Executing task testPython")
  Process(command,
    workingDirectory,
    // arguments for using org.apache.spark.streaming.kafka.KafkaTestUtils in Python
    "PYSPARK_SUBMIT_ARGS" -> "--jars %s pyspark-shell"
      // collect all jar paths from project
      .format((fullClasspath in Runtime value)
      .map(_.data.getCanonicalPath)
        .filter(_.contains(".jar"))
        .mkString(",")),
    "PYSPARK_PYTHON" -> "python3") ! s.log
}

//attach custom test task to default test tasks
test in Test := {
  testPythonTask.value
  (test in Test).value
}

testOnly in Test := {
  testPythonTask.value
  (testOnly in Test).value
}

在python测试用例(app\u test.py)中:

import random
import unittest
from itertools import chain

from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.tests import PySparkStreamingTestCase

class KafkaStreamTests(PySparkStreamingTestCase):
    timeout = 20  # seconds
    duration = 1

    def setUp(self):
        super(KafkaStreamTests, self).setUp()

        kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
            .loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils")
        self._kafkaTestUtils = kafkaTestUtilsClz.newInstance()
        self._kafkaTestUtils.setup()

    def tearDown(self):
        if self._kafkaTestUtils is not None:
            self._kafkaTestUtils.teardown()
            self._kafkaTestUtils = None

        super(KafkaStreamTests, self).tearDown()

    def _randomTopic(self):
        return "topic-%d" % random.randint(0, 10000)

    def _validateStreamResult(self, sendData, stream):
        result = {}
        for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
                                                   sum(sendData.values()))):
            result[i] = result.get(i, 0) + 1

        self.assertEqual(sendData, result)

    def test_kafka_stream(self):
        """Test the Python Kafka stream API."""
        topic = self._randomTopic()
        sendData = {"a": 3, "b": 5, "c": 10}

        self._kafkaTestUtils.createTopic(topic)
        self._kafkaTestUtils.sendMessages(topic, sendData)

        stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
                                         "test-streaming-consumer", {topic: 1},
                                         {"auto.offset.reset": "smallest"})
        self._validateStreamResult(sendData, stream)

更多的Flume,动静和其他的例子 pyspark.streaming.tests 模块。

相关问题