我有一个流是其他流的组合
final KTable<Long, CompositeInfo> compositeInfoTable = compositeImcTable
.leftJoin(
compositeFundTable,
(CompositeImc cimc, CompositeFund cf) -> {
CompositeInfo newCandidate = new CompositeInfo();
if (cimc != null) {
newCandidate.imcName = cimc.imcName;
newCandidate.imcID = cimc.imcID;
if (cf != null) {
newCandidate.investments = cf.investments;
}
}
return newCandidate;
})
.leftJoin(
compositeGeographyTable,
(CompositeInfo cinfo, CompositeGeography cg) -> {
if (cg != null) {
cinfo.regions = cg.regions;
}
return cinfo;
})
.leftJoin(
compositeSectorTable,
(CompositeInfo cinfo, CompositeSector cs) -> {
if (cs != null) {
cinfo.sectors = cs.sectors;
}
return cinfo;
})
.leftJoin(
compositeClusterTable,
(CompositeInfo cinfo, CustomCluster cc) -> {
if (cc != null && cc.clusters != null) {
cinfo.clusters = cc.clusters;
}
return cinfo;
})
.leftJoin(
compositeAlphaClusterTable,
(CompositeInfo cinfo, CompositeAlphaCluster cac) -> {
if (cac != null) {
cinfo.alphaClusters = cac.alphaClusters;
};
return cinfo;
},
Materialized.<Long, CompositeInfo, KeyValueStore<Bytes, byte[]>>as(this.storeName)
.withKeySerde(Serdes.Long())
.withValueSerde(compositeInfoSerde));
我的问题涉及compositeinfo和customcluster之间的左连接。customcluster如下所示
KTable<Long, CustomCluster> compositeClusterTable = builder
.stream(
SUB_TOPIC_COMPOSITE_CLUSTER,
Consumed.with(Serdes.Long(), compositeClusterSerde))
.filter((k, v) -> v.clusters != null)
.groupByKey(Serialized.with(Serdes.Long(), compositeClusterSerde))
.reduce((aggValue, newValue) -> newValue);
自定义群集中的消息如下所示
CustomCluster [clusterId=null, clusterName=null, compositeId=280, operation=null, clusters=[Cluster [clusterId=6041, clusterName=MyName]]]
所以我将这个对象中的hashmap集群分配给在compositeid上连接的compositeinfo对象中的集群。
我所看到的是,一个customcluster消息为一个给定的compositeid传入,并且被正确地处理,但是包含前一个集群的旧消息(我仍在研究这个)被再次处理。通过挖掘问题发生在Kafka内部的ktablektablerightjoin
public void process(final K key, final Change<V1> change) {
// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
if (key == null) {
return;
}
final R newValue;
R oldValue = null;
final V2 value2 = valueGetter.get(key);
if (value2 == null) {
return;
}
newValue = joiner.apply(change.newValue, value2);
if (sendOldValues) {
oldValue = joiner.apply(change.oldValue, value2);
}
context().forward(key, new Change<>(newValue, oldValue));
}
当joine第一次返回时,newvalue被正确更新。但是代码随后转到sendoldvalues块,一旦joiner返回,newvalue就是updategain,但这次是使用旧集群值。
下面是我的问题:
为什么在使用oldvalue第二次调用joiner时newvalues会得到更新
有没有办法关闭sendoldvalues
我的左连接是否与此有关。我知道以前版本的Kafka有一个链锁错误。但现在我是1.0了
更新:我发现了另一件事。如果我将join向上移动到连接链并删除其他连接,sendoldvalues仍然为false。所以如果我有如下的东西:
final KTable<Long, CompositeInfo> compositeInfoTable = compositeImcTable
.leftJoin(
compositeFundTable,
(CompositeImc cimc, CompositeFund cf) -> {
CompositeInfo newCandidate = new CompositeInfo();
if (cimc != null) {
newCandidate.imcName = cimc.imcName;
newCandidate.imcID = cimc.imcID;
if (cf != null) {
newCandidate.investments = cf.investments;
}
}
return newCandidate;
})
.leftJoin(
compositeClusterTable,
(CompositeInfo cinfo, CustomCluster cc) -> {
if (cc != null && cc.clusters != null) {
cinfo.clusters = cc.clusters;
}
return cinfo;
},
Materialized.<Long, CompositeInfo, KeyValueStore<Bytes, byte[]>>as(this.storeName)
.withKeySerde(Serdes.Long())
.withValueSerde(compositeInfoSerde));
这给了我正确的结果。但我认为,如果在这之后再放置更多的链式连接,它们可能会显示相同的错误行为。
在这一点上我什么都不确定,但我认为我的问题在于链式leftjoin和计算oldvalue的行为。还有其他人遇到过这个问题吗?
更新
在深入挖掘之后,我意识到森德的价值观是Kafka的内在原因,而不是我所经历的问题的根源。我的问题是,当valuejoiner for oldvalue返回时,newvalue会发生变化,我不知道这是否是由于对java对象的引用传递赋值造成的
这就是传入对象的外观
CustomCluster [clusterId=null, clusterName=null, compositeId=280, operation=null, clusters=[Cluster [clusterId=6041, clusterName=Sunil 2]]]
集群是一个 HashSet<Cluster> clusters = new HashSet<Cluster>();
然后将其连接到一个对象
CompositeInfo [compositeName=BUCKET_NM-280, compositeID=280, imcID=19651, regions=null, sectors=null, clusters=[]]
这里的集群类型相同,但属于compositeinfo类
加入时,我将customcluster对象的簇指定给compositeinfo对象
(CompositeInfo cinfo, CustomCluster cc) -> {
if (cc != null && cc.clusters != null) {
cinfo.clusters = cc.clusters;
}
return cinfo;
}
暂无答案!
目前还没有任何答案,快来回答吧!