kafka流链式leftjoin-在新消息之后再次处理以前的旧消息

vi4fp9gy  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(220)

我有一个流是其他流的组合

final KTable<Long, CompositeInfo> compositeInfoTable = compositeImcTable
    .leftJoin(
        compositeFundTable, 
        (CompositeImc cimc, CompositeFund cf) -> {
            CompositeInfo newCandidate = new CompositeInfo();
            if (cimc != null) {
                newCandidate.imcName = cimc.imcName;
                newCandidate.imcID = cimc.imcID;                                                                    
                if (cf != null) {
                    newCandidate.investments = cf.investments;
                }
            }
            return newCandidate;
        })
    .leftJoin(
        compositeGeographyTable, 
        (CompositeInfo cinfo, CompositeGeography cg) -> {
            if (cg != null) {
                cinfo.regions = cg.regions;
            }
            return cinfo;
        })
    .leftJoin(
        compositeSectorTable, 
        (CompositeInfo cinfo, CompositeSector cs) -> {
            if (cs != null) {
                cinfo.sectors = cs.sectors;
            }
            return cinfo;
        })
    .leftJoin(
        compositeClusterTable, 
        (CompositeInfo cinfo, CustomCluster cc) -> {
            if (cc != null && cc.clusters != null) {
                cinfo.clusters = cc.clusters;
            }
            return cinfo;
        })
    .leftJoin(
        compositeAlphaClusterTable, 
        (CompositeInfo cinfo, CompositeAlphaCluster cac) -> {
            if (cac != null) {
                cinfo.alphaClusters = cac.alphaClusters;
            };
            return cinfo;
        },
        Materialized.<Long, CompositeInfo, KeyValueStore<Bytes, byte[]>>as(this.storeName)
            .withKeySerde(Serdes.Long())
            .withValueSerde(compositeInfoSerde));

我的问题涉及compositeinfo和customcluster之间的左连接。customcluster如下所示

KTable<Long, CustomCluster> compositeClusterTable = builder
    .stream(
        SUB_TOPIC_COMPOSITE_CLUSTER,
        Consumed.with(Serdes.Long(), compositeClusterSerde))
    .filter((k, v) -> v.clusters != null)
    .groupByKey(Serialized.with(Serdes.Long(), compositeClusterSerde))
    .reduce((aggValue, newValue) -> newValue);

自定义群集中的消息如下所示

CustomCluster [clusterId=null, clusterName=null, compositeId=280, operation=null, clusters=[Cluster [clusterId=6041, clusterName=MyName]]]

所以我将这个对象中的hashmap集群分配给在compositeid上连接的compositeinfo对象中的集群。
我所看到的是,一个customcluster消息为一个给定的compositeid传入,并且被正确地处理,但是包含前一个集群的旧消息(我仍在研究这个)被再次处理。通过挖掘问题发生在Kafka内部的ktablektablerightjoin

public void process(final K key, final Change<V1> change) {
    // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
    if (key == null) {
        return;
    }

    final R newValue;
    R oldValue = null;

    final V2 value2 = valueGetter.get(key);
    if (value2 == null) {
        return;
    }

    newValue = joiner.apply(change.newValue, value2);

    if (sendOldValues) {
        oldValue = joiner.apply(change.oldValue, value2);
    }

    context().forward(key, new Change<>(newValue, oldValue));
}

当joine第一次返回时,newvalue被正确更新。但是代码随后转到sendoldvalues块,一旦joiner返回,newvalue就是updategain,但这次是使用旧集群值。
下面是我的问题:
为什么在使用oldvalue第二次调用joiner时newvalues会得到更新
有没有办法关闭sendoldvalues
我的左连接是否与此有关。我知道以前版本的Kafka有一个链锁错误。但现在我是1.0了
更新:我发现了另一件事。如果我将join向上移动到连接链并删除其他连接,sendoldvalues仍然为false。所以如果我有如下的东西:

final KTable<Long, CompositeInfo> compositeInfoTable = compositeImcTable
    .leftJoin(
        compositeFundTable, 
        (CompositeImc cimc, CompositeFund cf) -> {
            CompositeInfo newCandidate = new CompositeInfo();
            if (cimc != null) {
                newCandidate.imcName = cimc.imcName;
                newCandidate.imcID = cimc.imcID;
                if (cf != null) {
                    newCandidate.investments = cf.investments;
                }
            }   
            return newCandidate;
        })
    .leftJoin(
        compositeClusterTable, 
        (CompositeInfo cinfo, CustomCluster cc) -> {
            if (cc != null && cc.clusters != null) {
                cinfo.clusters = cc.clusters;
            }
            return cinfo;
        },
        Materialized.<Long, CompositeInfo, KeyValueStore<Bytes, byte[]>>as(this.storeName)
          .withKeySerde(Serdes.Long())
          .withValueSerde(compositeInfoSerde));

这给了我正确的结果。但我认为,如果在这之后再放置更多的链式连接,它们可能会显示相同的错误行为。
在这一点上我什么都不确定,但我认为我的问题在于链式leftjoin和计算oldvalue的行为。还有其他人遇到过这个问题吗?
更新
在深入挖掘之后,我意识到森德的价值观是Kafka的内在原因,而不是我所经历的问题的根源。我的问题是,当valuejoiner for oldvalue返回时,newvalue会发生变化,我不知道这是否是由于对java对象的引用传递赋值造成的
这就是传入对象的外观

CustomCluster [clusterId=null, clusterName=null, compositeId=280, operation=null, clusters=[Cluster [clusterId=6041, clusterName=Sunil 2]]]

集群是一个 HashSet<Cluster> clusters = new HashSet<Cluster>(); 然后将其连接到一个对象

CompositeInfo [compositeName=BUCKET_NM-280, compositeID=280, imcID=19651, regions=null, sectors=null, clusters=[]]

这里的集群类型相同,但属于compositeinfo类
加入时,我将customcluster对象的簇指定给compositeinfo对象

(CompositeInfo cinfo, CustomCluster cc) -> {
    if (cc != null && cc.clusters != null) {
        cinfo.clusters = cc.clusters;
    }
    return cinfo;
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题