我有一个用例,我曾经通过流(kinesis)获取数据,并希望对其执行一些转换。在转换过程中,我需要查找mongodb/documentdb/redis,其中存储了我们的参考数据。我正在通过apachebeam实现这个用例。
我需要使用这些数据库(mongodb/documentdb/redis)中的集合/表作为辅助输入,这样我就可以加载它一次并从那里进行查找。我想一次加载所有记录,并使其可用于侧输入。
我试过了,但是我的错误率越来越低了-
样本代码-
final PCollectionView<Map<String, String>> mongoInput = pipeline.apply("Read from MongoDB", MongoDbIO.read().withUri("mongodb://URL")
.withDatabase("dbname").withCollection("collection_name"))
.apply("Document to String", ParDo.of(new MongoToKeyValueDoFn()))
.apply("create a view for side input", View.<String, String>asMap());
在mongotokeyvaluedofn类中,我将键和值放入hashmap并发出它。
错误-
The method apply(String, PTransform<? super PCollection<Map<String,String>>,OutputT>) in the type PCollection<Map<String,String>> is not applicable for the arguments (String, View.AsMap<String,String>)
有人能告诉我在输入端加载数据吗?
1条答案
按热度按时间7uhlpewt1#
从查看的文档中:
如果已知kv<k,v>的pcollection对每个键的每个窗口都有一个值,则使用asmap()将其作为Map查看<k,v>
这里的问题是你用
MongoToKeyValueDoFn
输出aPCollection<Map<String, String>>
,即包含Map的pcollection,而asmap期望PCollection<String, String>
,即字符串键和字符串值的p集合。这里的解决办法是调整
MongoToKeyValueDoFn
为了遵循预期的格式,它应该输出键值对,而不是将它们存储在hashmap中。