例如,我有一个购物车的kstream,每个购物车都有一个产品id的列表。此外,还有一个产品流。我该怎么把他们连在一起?
public class ShoppingCart {
List<ProductKey> productKeys;
}
public class Product {
ProductKey key;
String name;
}
public class ProductKey {
String id;
}
KStream<String, ShoppingCart> shoppingCartKStream;
KStream<ProductKey, Product> productKStream;
我想要的结果是这样的
KStream<String, ShoppingCartWithProducts> joinedStream;
public class ShoppingCartWithProducts {
List<Product> products;
}
有没有一个简单的方法来存档这个?
编辑:我知道有办法,但我觉得太复杂了。简单地说:
我需要绘制购物车的平面图
然后我可以将结果与产品kstream连接起来
对中间结果进行分组和聚合
最后加入shoppingcart kstream
KStream<String, ProductKey> productKeyStream = shoppingCartKStream
.flatMap((key, shoppingCart) -> shoppingCart.productKeys.stream()
.map(productKey -> KeyValue.pair(key, productKey))
.collect(Collectors.toList())
);
KTable<String, Product> productStreamWithShoppingCartKey = productKeyStream.toTable()
.join(
productKStream.toTable(),
productKey -> productKey,
(productKey, product) -> product
);
KTable<String, ArrayList<Product>> productListStream = productStreamWithShoppingCartKey
.groupBy(KeyValue::pair)
.aggregate(
(Initializer<ArrayList<Product>>) ArrayList::new,
(key, value, aggregate) -> addProductToList(aggregate, value),
(key, value, aggregate) -> removeProductFromList(aggregate, value)
);
KStream<String, ShoppingCartWithProducts> shoppingCartWithProductsKStream = shoppingCartKStream.join(
productListStream,
(shoppingCart, productList) -> new ShoppingCartWithProducts(productList)
);
当然,它非常简单,我还需要处理墓碑等。
1条答案
按热度按时间rekjcdws1#
在你定义你之后
StreamsBuilder
这是流dsl的入口点。您可以使用
JoinWindows.of(Duration.ofMinutes(5))
. 否则,必须使用相同类型的两个流的键kafka-stream
无法比较不同类型的键。它就像一个数据库连接。所以,我用String
为了ShoppingCart
以及Product
. 然后.join(...
运算符匹配相同键的事件,您可以构建新事件ShoppingCartWithProducts
.你可以在这里找到更详细的信息。