我对kafka和ksqldb都是新手,我正在尝试评估它们是否适合我正在从事的项目。如果可能的话,我希望使用ksqldb,而不是kafka流。
我正在考虑使用它来跟踪互联网会话中的数据使用情况。
数据格式还没有定义,但是它可能有几个字段,这些字段组合起来将标识一个服务和几个计数器。计数器只会增加-它们显示每个会话中各种类型的总数据。
我每小时会得到几次统计数据,大约4次,我希望每一个计数器每一次都能得到一个小时增量。
因此,例如,对于一个会话,我可能会有这样一些数据(尽管每个记录有几个计数器,而且“时间”增量不一致,但这样更容易推理):
0h03: 0 (w/ start flag)
0h11: 5
0h26: 60
0h41: 150
0h56: 156
1h11: 300
1h26: 301
1h41: 500
1h56: 560
2h11: 580
2h26: 601
2h41: 630
2h56: 685
我想说的是:
0h: 156 (i.e. 156 - 0/start)
1h: 404 (i.e. 560 - 156)
2h: 125 (i.e. 685 - 560)
我可以想象这样的情况:我确定每个记录相对于前一个记录的增量,因为这样每小时的聚合,我理解,是微不足道的。我不确定的是如何比较像那样的连续记录。
我考虑过一个解决方案,比如说0h45-2h,然后执行max()-min(),但是由于时间增量不一致,所以0h的最后一个计数器完全有可能是在0h44处加时间戳的,同样有可能在0h46处有一个计数器,在0h59处有另一个计数器-在这种情况下,我们要与0h59计数器进行比较,不是0h46,但min()将返回0h46值。
在某些情况下,输入数据可能会被无序接收,可能会延迟几个小时——因此,我无法在输入时向每条记录添加某种递增计数器。我见过这种行比较问题的解决方案,它使用这样的计数器。我希望我可以在ksqldb中使用某种排序函数来处理这个问题,而且我已经看到窗口支持宽限期,这在这里可能也适用于我。
不管怎样-我就在这里。我不是在寻找一个现成的解决方案,但是,为kafka/ksqldb新手指明了正确的方向,或者可能是一些“这是不可能的,你需要做一些java开发并用streams来做这件事”——假设这是可能的!
暂无答案!
目前还没有任何答案,快来回答吧!