在我们的项目中,我们有使用Apache Flink的要求,它将使用SQL连接器将数据从Flink写入Mongo db
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/mongodb/
根据Flink的文档,当在DDL中没有定义主键的情况下在Mongo中插入文档时,Mongo连接器使用upsert语义。
当我们尝试在Flink表中没有主键时,会出现异常
org.apache.flink.table.api.TableException:表接收器'default_catalog。default_database。Correlation”不支持使用节点Join产生的更新和删除更改(joinType =[InnerJoin],where =[($f3 = id)],select =[graph,$f3,id,type,startDate,hascontextEntityId,providedBy,harvestedDate],leftInputSpec =[NoUniqueKey],rightInputSpec =[NoUniqueKey])。Apache退缩table。计划者计划优化程序。FlinkChangelogModeInferenceProgram $SatisfyModifyKindSetTraitVisitor。createNewNode(FlinkChangelogModeInferenceProgram. scala:405)在org.Apache退缩table。计划者计划优化程序。FlinkChangelogModeInferenceProgram $SatisfyModifyKindSetTraitVisitor。访问(FlinkChangelogModeInferenceProgram. scala:277)在org.Apache退缩table。计划者计划优化程序。FlinkChangelogModeInferenceProgram $SatisfyModifyKindSetTraitVisitor。visitChild(FlinkChangelogModeInferenceProgram. scala:FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram. scala:FlinkChangelogModeInferenceProgram $SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram. scala:354)at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike. scala:233)在scala。收藏。不可变射程foreach(Range. scala:155)在 www.example.com (TraversableLike.scala:233) at scala.collection.TraversableLike.map $(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map (Traversable.scala:104)
对此有任何修复或建议吗
我只是好奇,有没有什么建议可以让我们跳过upsert语义,在Mongo中执行plain insert
1条答案
按热度按时间mrzz3bfm1#
您的查询结果是一个更新日志流,而不是一个只附加的流。如果您已经从接收器定义中删除了主键,它会尝试将changelog流发送到仅附加接收器,但这是不兼容的。所以你需要修正你的查询,使它产生一个只附加的流,而不是一个更新日志流。