我收到了来自 Order
来自另一个主题的主题和股票流 Stock
实时的。在这里我只想用最新的产品来丰富我的订单 product
对象。我做了一张table Stock
正在从另一个更新的主题 market-data
来源。订单流也是如此。
final KStream<String, OrderObject> orderStream = builder.stream(TOPIC_Agora); //OrderObject.orderid is key
final KTable<String, StockMarketData> stockDataTable = builder.table(TOPIC_Stock_Data); //StockMarketData.product is key
//I want to enrich all orders (comming on topic real-time) with latest product value
KStream<String, OrderObject> orderByProduct = orderStream.map((k,v)-> new KeyValue<String,OrderObject>(v.getSymbol(), v));
//StockEnrichedOrder has OrderObject and OrderObject
KStream<String, StockEnrichedOrder> enrichedOrdersStream =
orderByProduct.leftJoin(stockDataTable, new ValueJoiner<OrderObject, StockMarketData, StockEnrichedOrder>() {
@Override
public StockEnrichedOrder apply(OrderObject order, StockMarketData product) {
return new StockEnrichedOrder(order, product);
}
});
// enrichedOrdersStream is NOT getting updated on realtime while working on startup
KafkaStreams streams = new KafkaStreams(topology, propAll);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
Kafka流配置
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, serverURI);
settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
settings.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSONSerdeComp.class);
settings.put("acks", "all");
你知道我在这里错过了什么吗?
暂无答案!
目前还没有任何答案,快来回答吧!