如何在kstream(kafka streams)中连接列表的每个元素

e5njpo68  于 2021-07-09  发布在  Java
关注(0)|答案(1)|浏览(399)

例如,我有一个购物车的kstream,每个购物车都有一个产品id的列表。此外,还有一个产品流。我该怎么把他们连在一起?

public class ShoppingCart {
    List<ProductKey> productKeys;
}

public class Product {
    ProductKey key;
    String name;
}

public class ProductKey {
    String id;
}

KStream<String, ShoppingCart> shoppingCartKStream;
KStream<ProductKey, Product> productKStream;

我想要的结果是这样的

KStream<String, ShoppingCartWithProducts> joinedStream;
public class ShoppingCartWithProducts {
    List<Product> products;
}

有没有一个简单的方法来存档这个?
编辑:我知道有办法,但我觉得太复杂了。简单地说:
我需要绘制购物车的平面图
然后我可以将结果与产品kstream连接起来
对中间结果进行分组和聚合
最后加入shoppingcart kstream

KStream<String, ProductKey> productKeyStream = shoppingCartKStream
        .flatMap((key, shoppingCart) -> shoppingCart.productKeys.stream()
                .map(productKey -> KeyValue.pair(key, productKey))
                .collect(Collectors.toList())
        );

KTable<String, Product> productStreamWithShoppingCartKey = productKeyStream.toTable()
        .join(
                productKStream.toTable(),
                productKey -> productKey,
                (productKey, product) -> product
        );

KTable<String, ArrayList<Product>> productListStream = productStreamWithShoppingCartKey
        .groupBy(KeyValue::pair)
        .aggregate(
                (Initializer<ArrayList<Product>>) ArrayList::new,
                (key, value, aggregate) -> addProductToList(aggregate, value),
                (key, value, aggregate) -> removeProductFromList(aggregate, value)
        );

KStream<String, ShoppingCartWithProducts> shoppingCartWithProductsKStream = shoppingCartKStream.join(
        productListStream,
        (shoppingCart, productList) -> new ShoppingCartWithProducts(productList)
);

当然,它非常简单,我还需要处理墓碑等。

rekjcdws

rekjcdws1#

在你定义你之后 StreamsBuilder 这是流dsl的入口点。

StreamsBuilder builder = new StreamsBuilder();

您可以使用 JoinWindows.of(Duration.ofMinutes(5)) . 否则,必须使用相同类型的两个流的键 kafka-stream 无法比较不同类型的键。它就像一个数据库连接。所以,我用 String 为了 ShoppingCart 以及 Product . 然后 .join(... 运算符匹配相同键的事件,您可以构建新事件 ShoppingCartWithProducts .

KStream<String, ShoppingCart> shoppingCartKStream = ...;
KStream<String, Product> productKStream = ...;

shoppingCartKStream.join(productKStream,
    (shop, prod) -> {
        log.info("ShoppingCart: {} with Product: {}", shop, prod);

        ShoppingCartWithProducts shoppingCartWithProducts = new ShoppingCartWithProducts();
        shoppingCartWithProducts.setShoppingCart(shop);
        shoppingCartWithProducts.setProduct(prod);
        return shoppingCartWithProducts;
    },
    JoinWindows.of(Duration.ofMinutes(5)),
    StreamJoined.with(Serdes.String(),
        new JsonSerde<>(ShoppingCart.class),
        new JsonSerde<>(Product.class)))
   .foreach((k, v) -> log.info("ShoppingCartWithProducts ID: {}, value: {}", k, v));

你可以在这里找到更详细的信息。

相关问题