kafka流globalktable连接

7gcisfzg  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(287)

所以我有一个kstream,它被反序列化为pojo,如下所示

public class FinancialMessage {

public String user_id;
public String stock_symbol;
public String exchange_id;

}

下面是全球ktable记录的样子

public class CompanySectors {

public String company_id;
public String company_name;
public String tckr;
public String sector_cd;
}

我想加入kstream的 stock_symbol 带ktable的字段 tckr 现场。这可能吗?我想创建一个新的 EnrichedMessage 对象,然后再将其流到另一个主题中。我有下面这样的代码,但我似乎得到一些空指针异常。

Exception in thread "trade-enrichment-stream-0c7e7782-4217-4450-8086-21871b4ebc45-StreamThread-1" java.lang.NullPointerException
    at com.domain.EnrichedMessage.<init>(EnrichedMessage.java:51)
    at com.domain.TradeEnrichmentTopology.lambda$3(TradeEnrichmentTopology.java:73)
    at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:79)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:101)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:801)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)

下面是代码片段的样子。

KStream<String, FinancialMessage> financialMessageStream =
        builder.stream(
            INCOMING_TOPIC,
            Consumed.with(Serdes.String(), financialMessageSerde)
        );

    GlobalKTable<String, CompanySectors> companySectorsStore = 
        builder.globalTable(
            KTABLE_TOPIC,
            Consumed.with(Serdes.String(), companySectorsSerde)
    );

    KStream<String, EnrichedMessage> enrichedStream = financialMessageStream.leftJoin(
        companySectorsStore,
        (financialMessageKey, financialMessageValue) -> financialMessageValue.stock_symbol,
        (financialMessageValue, companySectorsValue) -> new EnrichedMessage(financialMessageValue, companySectorsValue)
    );

    enrichedStream.to(
        OUTGOING_TOPIC,
        Produced.with(Serdes.String(), enrichedMessageSerde));

我想我的leftjoin逻辑可能有错误。

yks3o0rb

yks3o0rb1#

在执行左连接时,可以假设左流的记录不为null;但是,不能假设正确的globalktable将有一个与给定键匹配的记录,因此结果记录可能为null。在您的例子中,当您示例化 new EnrichedMessage(financialMessageValue, companySectorsValue) ,你确定 companySectorsValue 不是空的吗?如果为空,您是否正确处理?似乎你的npe发生在 EnrichedMessage ,所以要确保你知道 companySectorsValue 可以为空。
另外,请确保在任何连接逻辑发生之前预填充了globalktable。

相关问题