集成测试flink和kafka与scalatest嵌入的kafka

wpcxdonn  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(533)

我想和flink和kafka一起进行集成测试。这个过程是从Kafka中读取数据,用flink进行一些操作,然后把数据流放到Kafka中。
我想从头到尾测试这个过程。现在我使用scalatest嵌入式Kafka。
我举了一个例子,我尽量简单:

  1. import java.util.Properties
  2. import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema
  4. import org.apache.flink.api.common.typeinfo.TypeInformation
  5. import org.apache.flink.streaming.api.functions.sink.SinkFunction
  6. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  7. import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
  8. import org.scalatest.{Matchers, WordSpec}
  9. import scala.collection.mutable.ListBuffer
  10. object SimpleFlinkKafkaTest {
  11. class CollectSink extends SinkFunction[String] {
  12. override def invoke(string: String): Unit = {
  13. synchronized {
  14. CollectSink.values += string
  15. }
  16. }
  17. }
  18. object CollectSink {
  19. val values: ListBuffer[String] = ListBuffer.empty[String]
  20. }
  21. val kafkaPort = 9092
  22. val zooKeeperPort = 2181
  23. val props = new Properties()
  24. props.put("bootstrap.servers", "localhost:" + kafkaPort.toString)
  25. props.put("schema.registry.url", "localhost:" + zooKeeperPort.toString)
  26. val inputString = "mystring"
  27. val expectedString = "MYSTRING"
  28. }
  29. class SimpleFlinkKafkaTest extends WordSpec with Matchers with EmbeddedKafka {
  30. "runs with embedded kafka" should {
  31. "work" in {
  32. implicit val config = EmbeddedKafkaConfig(
  33. kafkaPort = SimpleFlinkKafkaTest.kafkaPort,
  34. zooKeeperPort = SimpleFlinkKafkaTest.zooKeeperPort
  35. )
  36. withRunningKafka {
  37. publishStringMessageToKafka("input-topic", SimpleFlinkKafkaTest.inputString)
  38. val env = StreamExecutionEnvironment.getExecutionEnvironment
  39. env.setParallelism(1)
  40. val kafkaConsumer = new FlinkKafkaConsumer011(
  41. "input-topic",
  42. new SimpleStringSchema,
  43. SimpleFlinkKafkaTest.props
  44. )
  45. implicit val typeInfo = TypeInformation.of(classOf[String])
  46. val inputStream = env.addSource(kafkaConsumer)
  47. val outputStream = inputStream.map(_.toUpperCase)
  48. val kafkaProducer = new FlinkKafkaProducer011(
  49. "output-topic",
  50. new SimpleStringSchema(),
  51. SimpleFlinkKafkaTest.props
  52. )
  53. outputStream.addSink(kafkaProducer)
  54. env.execute()
  55. consumeFirstStringMessageFrom("output-topic") shouldEqual SimpleFlinkKafkaTest.expectedString
  56. }
  57. }
  58. }
  59. }

我有个错误,所以我加了一行 implicit val typeInfo = TypeInformation.of(classOf[String]) 但我真的不明白我为什么要这么做。
现在这个代码不起作用,它运行时没有中断,但不停止,也不给出任何结果。
有人知道吗?更好的办法是测试这种管道。
谢谢!
编辑:添加 env.execute() 改变错误。

esbemjvw

esbemjvw1#

我想出了一个简单的解决办法。
其目的是:
启动kafka嵌入式服务器
创建测试主题(这里是输入和输出)
在将来启动flink作业以避免阻塞主线程
将消息发布到输入主题
检查输出主题的结果
以及工作原型:

  1. import java.util.Properties
  2. import org.apache.flink.streaming.api.scala._
  3. import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
  4. import org.apache.flink.api.common.serialization.SimpleStringSchema
  5. import org.apache.flink.core.fs.FileSystem.WriteMode
  6. import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
  7. import org.scalatest.{Matchers, WordSpec}
  8. import scala.concurrent.ExecutionContext.Implicits.global
  9. import scala.concurrent.Future
  10. class SimpleFlinkKafkaTest extends WordSpec with Matchers with EmbeddedKafka {
  11. "runs with embedded kafka on arbitrary available ports" should {
  12. val env = StreamExecutionEnvironment.getExecutionEnvironment
  13. "work" in {
  14. val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 9092, zooKeeperPort = 2182)
  15. val properties = new Properties()
  16. properties.setProperty("bootstrap.servers", "localhost:9092")
  17. properties.setProperty("zookeeper.connect", "localhost:2182")
  18. properties.setProperty("group.id", "test")
  19. properties.setProperty("auto.offset.reset", "earliest")
  20. val kafkaConsumer = new FlinkKafkaConsumer011[String]("input", new SimpleStringSchema(), properties)
  21. val kafkaSink = new FlinkKafkaProducer011[String]("output", new SimpleStringSchema(), properties)
  22. val stream = env
  23. .addSource(kafkaConsumer)
  24. .map(_.toUpperCase)
  25. .addSink(kafkaSink)
  26. withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
  27. createCustomTopic("input")
  28. createCustomTopic("output")
  29. Future{env.execute()}
  30. publishStringMessageToKafka("input", "Titi")
  31. consumeFirstStringMessageFrom("output") shouldEqual "TITI"
  32. }
  33. }
  34. }
  35. }
展开查看全部

相关问题