kafkastream在外键上连接不起作用

2nc8po8w  于 2021-06-05  发布在  Kafka
关注(0)|答案(0)|浏览(201)

我收到了来自 Order 来自另一个主题的主题和股票流 Stock 实时的。在这里我只想用最新的产品来丰富我的订单 product 对象。我做了一张table Stock 正在从另一个更新的主题 market-data 来源。订单流也是如此。

final KStream<String, OrderObject> orderStream = builder.stream(TOPIC_Agora); //OrderObject.orderid is key
    final KTable<String, StockMarketData> stockDataTable = builder.table(TOPIC_Stock_Data); //StockMarketData.product is key

    //I want to enrich all orders (comming on topic real-time) with latest product value 
    KStream<String, OrderObject> orderByProduct  = orderStream.map((k,v)-> new KeyValue<String,OrderObject>(v.getSymbol(), v));

   //StockEnrichedOrder has OrderObject and OrderObject
    KStream<String, StockEnrichedOrder> enrichedOrdersStream  = 
            orderByProduct.leftJoin(stockDataTable, new ValueJoiner<OrderObject, StockMarketData, StockEnrichedOrder>() {

        @Override
        public StockEnrichedOrder apply(OrderObject order, StockMarketData product) {
            return new StockEnrichedOrder(order, product);
        }
    });

    // enrichedOrdersStream  is NOT getting updated on realtime while working on startup 

    KafkaStreams streams = new KafkaStreams(topology, propAll);
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));

Kafka流配置

settings.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
    settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, serverURI);
    settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    settings.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
    settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSONSerdeComp.class);
    settings.put("acks", "all");

你知道我在这里错过了什么吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题