我是Kafka流api的新手,我正在尝试创建一个ktable。我有一个输入主题: s-order-topic
,这是一条json格式的消息,如下所示。
{ "current_ts": "2019-12-24 13:16:40.316952",
"primary_keys": ["ID"],
"before": null,
"tokens": {"txid":"3.17.2493",
"csn":"64913009"},
"op_type":"I",
"after": { "CODE":"AAAA41",
"STATUS":"COMPLETED",
"ID":24},
"op_ts":"2019-12-24 13:16:40.316941",
"table":"S_ORDER"}
我阅读了来自这个主题的消息,我想创建一个以字段作为键的ktable "after":"ID"
值中的所有字段 "after"
字段(除 "ID"
).
只有在使用默认聚合函数(即count)时,我才成功地创建了ktable。但是我很难创建自己的聚合函数。下面是我试图创建ktable的部分代码。
KTable<Long, String> s_table = builder.stream("s-order-topic", Consumed.with(Serdes.Long(),Serdes.String()))
.mapValues(value -> {
String time;
JSONObject json = new JSONObject(value);
if (json.getString("op_type").equals("I")) {
time = "after";
}else {
time = "before";
}
JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
return json2.toString();
})
.groupBy((key, value) -> {
JSONObject json = new JSONObject(value);
return json.getLong("ID");
}, Grouped.with(Serdes.Long(), Serdes.String()))
.aggregate( ... );
如何实现这个ktable?
我处理问题的方法正确吗?
(mapvalues->仅保留“before”/“after”字段。groupby->使id成为消息的键。聚合->?)
1条答案
按热度按时间n1bvdmb61#
我为我的案子想出了解决办法。我实现了ktable,如下所示:
这个
aggregate
函数不适合这种情况,而是我用了reduce
功能。控制台使用者的输出如下所示: