我正在实现通过apachekafka从外部环境获取数据的应用程序。这些数据首先Map到对象,然后传递到进程(时间窗口)(请看下面的代码)
val busDataStream = env.addSource(kafkaConsumer)
.filter { _.nonEmpty}
.flatMap(line => JsonMethods.parse(line).toOption)
.map(_.extract[BusModel])
class CustomProcess() extends ProcessWindowFunction[BusModel, BusModel, String, TimeWindow] {
lazy val busState: ValueState[BusModel] = getRuntimeContext.getState(
new ValueStateDescriptor[BusModel]("BusModel state", classOf[BusModel])
)
override def process(key: String, context: Context, elements: Iterable[BusModel], out: Collector[BusModel]): Unit = {
for (e <- elements) {
if (busState.value() != null) {
out.collect(busState.value())
val result: Double = calculateSomething(e, busState.value())
}
busState.update(e)
println(s"BusState: ${busState.value()}")
}
}
}
val dataStream: DataStream[BusModel] = busDataStream
.keyBy(_.VehicleNumber)
.timeWindow(Time.seconds(10))
.process(new CustomCountProc)
在准备好新的信息后,我想把这些数据输入Cassandra数据库。我尝试使用连接器实现这个值,但不幸的是,新记录没有显示在数据库中。。。
我还添加了一个createtypeinformation方法,该方法应该将所选对象的数据Map到数据库中的列类型,但不幸的是,这没有帮助。
createTypeInformation[(String, Double, Double, Double)]
val sinkStream = dataStream
.map(busRide => (
java.util.UUID.randomUUID.toString,
busRide.valueA,
busRide.valueB,
busRide.valueC,
))
CassandraSink.addSink(sinkStream)
.setQuery("INSERT INTO transport.bus_flink_speed(" +
"\"FirstColumn\", " +
"\"SecondColumn " +
"\"ThirdColumn\", " +
"\"ForthColumn\")" +
" values (?, ?, ?, ?);")
.setHost("localhost")
.build()
env.execute("Flink Kafka Example")
有人知道为什么这样不行吗?
暂无答案!
目前还没有任何答案,快来回答吧!