kafka将单个日志事件行聚合为组合日志事件

wfveoks0  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(379)

我在用Kafka处理日志事件。我有Kafka连接和Kafka简单连接器和流转换流的基本知识。
现在我有一个日志文件,其结构如下:

timestamp event_id event

日志事件有多个日志行,这些日志行由事件id连接(例如邮件日志)
例子:

1234 1 START
1235 1 INFO1
1236 1 INFO2
1237 1 END

一般来说,有多个事件:
例子:

1234 1 START
1234 2 START
1235 1 INFO1
1236 1 INFO2
1236 2 INFO3
1237 1 END
1237 2 END

时间窗口(从开始到结束)可能长达5分钟。
因此,我想要一个像

event_id combined_log

例子:

1 START,INFO1,INFO2,END
2 START,INFO2,END

实现这一目标的正确工具是什么?我试着用Kafka流来解决这个问题,但我知道怎么。。

qq24tv8q

qq24tv8q1#

在您的用例中,您基本上是基于消息有效负载重建会话或事务。目前还没有内置的、随时可用的对此类功能的支持。但是,您可以使用kafka的streams api中的处理器api部分自己实现这个功能。可以编写自定义处理器,使用状态存储跟踪给定密钥的会话/事务的启动、添加和结束时间。
邮件列表中的一些用户一直在做iirc,尽管我不知道我可以给您指出的现有代码示例。
您需要注意的是正确处理无序数据。在上面的示例中,您按正确的顺序列出了所有输入数据:

1234 1 START
1234 2 START
1235 1 INFO1
1236 1 INFO2
1236 2 INFO3
1237 1 END
1237 2 END

但实际上,消息/记录可能会无序到达,就像这样(我只显示带有键的消息) 1 为了简化示例):

1234 1 START
1237 1 END
1236 1 INFO2
1235 1 INFO1

即使发生这种情况,我知道在您的用例中,您仍然希望将此数据解释为: START -> INFO1 -> INFO2 -> END 而不是 START -> END (忽略/删除) INFO1 以及 INFO2 =数据丢失)或 START -> END -> INFO2 -> INFO1 (顺序不正确,可能也违反了语义约束)。

相关问题