ksql-使用geo\u距离计算来自2条消息的距离

nhaq1z21  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(338)

我有一个Kafka主题,主题中的每条消息都有lat/lon和事件时间戳。创建了一个引用主题的流,并希望使用地理距离计算两点之间的距离。例子

GpsDateTime            lat              lon
2016-11-30 22:38:36,    32.685757,  -96.735942
2016-11-30 22:39:07,    32.687347,  -96.732841
2016-11-30 22:39:37,    32.68805,   -96.729726

我想创建一个新的流在上面的流和丰富它的距离。

GpsDateTime            lat              lon          Distance
2016-11-30 22:38:36,    32.685757,  -96.735942        0
2016-11-30 22:39:07,    32.687347,  -96.732841        0.340
2016-11-30 22:39:37,    32.68805,   -96.729726        0.302

使用ksql有可能达到预期的结果吗?或者如何在处理新消息时引用以前的消息?

idv4meu8

idv4meu81#

首先,这些读数来自某种装置吗?如果是这样的话,你有一个唯一的id(uuid)给他们吗?我会把它放到你的小溪里 UUID, GpsDateTime, lat, lon .
你需要创建一个相当基本的Kafka流应用程序。在这个应用程序中,您将把流中的最新读数存储到storebuilder中。然后,当从kafka收到新消息时,您将检索这个最新值,进行计算,然后将新的lat、long值存储到storebuilder中。
当然,我不清楚你是否只想有一个lat,long值,并且你所有的后续值都是从第一个读数开始计算的。或者,如果你想有一个滚动计算,你总是比较最近和当前读数之间的距离。
不管怎样,您可以在以下位置实际看到此代码:https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/statestoresinthedslintegrationtest.java
这个例子是一个字数统计的例子,但是可以根据你的用例快速转换。
静态最终类wordcounttransformersupplier(第78行)将成为latlongdistancecomputation。
您可以使用适当的类型(无论您将lat/lon存储为什么)创建storebuilder(第154行)。
第165行是实际从流入的值流中读取项的位置。
当然,您还需要编辑inputOpic和outputOpic(第66-67行)以及其他一些内容。

相关问题