scala—flink中处理的数据不保存在cassandra数据库中

xmd2e60i  于 2021-07-15  发布在  Flink
关注(0)|答案(0)|浏览(296)

我正在实现通过apachekafka从外部环境获取数据的应用程序。这些数据首先Map到对象,然后传递到进程(时间窗口)(请看下面的代码)

  1. val busDataStream = env.addSource(kafkaConsumer)
  2. .filter { _.nonEmpty}
  3. .flatMap(line => JsonMethods.parse(line).toOption)
  4. .map(_.extract[BusModel])
  5. class CustomProcess() extends ProcessWindowFunction[BusModel, BusModel, String, TimeWindow] {
  6. lazy val busState: ValueState[BusModel] = getRuntimeContext.getState(
  7. new ValueStateDescriptor[BusModel]("BusModel state", classOf[BusModel])
  8. )
  9. override def process(key: String, context: Context, elements: Iterable[BusModel], out: Collector[BusModel]): Unit = {
  10. for (e <- elements) {
  11. if (busState.value() != null) {
  12. out.collect(busState.value())
  13. val result: Double = calculateSomething(e, busState.value())
  14. }
  15. busState.update(e)
  16. println(s"BusState: ${busState.value()}")
  17. }
  18. }
  19. }
  20. val dataStream: DataStream[BusModel] = busDataStream
  21. .keyBy(_.VehicleNumber)
  22. .timeWindow(Time.seconds(10))
  23. .process(new CustomCountProc)

在准备好新的信息后,我想把这些数据输入Cassandra数据库。我尝试使用连接器实现这个值,但不幸的是,新记录没有显示在数据库中。。。
我还添加了一个createtypeinformation方法,该方法应该将所选对象的数据Map到数据库中的列类型,但不幸的是,这没有帮助。

  1. createTypeInformation[(String, Double, Double, Double)]
  2. val sinkStream = dataStream
  3. .map(busRide => (
  4. java.util.UUID.randomUUID.toString,
  5. busRide.valueA,
  6. busRide.valueB,
  7. busRide.valueC,
  8. ))
  9. CassandraSink.addSink(sinkStream)
  10. .setQuery("INSERT INTO transport.bus_flink_speed(" +
  11. "\"FirstColumn\", " +
  12. "\"SecondColumn " +
  13. "\"ThirdColumn\", " +
  14. "\"ForthColumn\")" +
  15. " values (?, ?, ?, ?);")
  16. .setHost("localhost")
  17. .build()
  18. env.execute("Flink Kafka Example")

有人知道为什么这样不行吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题