我尝试在apachebeam中执行查找操作,并将查找数据作为侧输入发送,这会减慢我的工作速度。我正在对csv文件的每一行的特定列执行查找操作,并将查找Map作为侧输入传递。我用spark做跑步者。我的工作需要最初的3分钟,只是分发侧输入。查找数据大约是170MB,看起来并不多。我的实现看起来像
Map<String, Map<Map<String, String>, Map<String, String>>> hm = \\getting data from external source
LookUpData lookUpData = new LookUpData(hm); \\Simple data class which implements serializable
PCollectionView<LookUpData> filedata = pipeline.apply("create side input", Create.of(lookUpData)).apply(View.asSingleton());
此pcollectionview后来作为侧输入传递。
在阅读了这篇文章googledataflow/apachebeam之后,来自pcollection的python端输入会降低性能。尝试用类似的方式在java中实现
PCollectionView<Map<String, Map<Map<String, String>, Map<String, String>>>> fileData = pipeline.apply("create side input", Create.of(hm)).apply(View.asMap());
我得到以下错误
java.lang.IllegalArgumentException: Unable to infer a coder and no Coder was specified. Please set a coder by invoking Create.withCoder() explicitly or a schema by invoking Create.withSchema()
at org.apache.beam.sdk.transforms.Create$Values.expand(Create. java: 358)
at org.apache.beam.sdk.transforms.Create$Values.expand(Create. java: 272)
at org.apache.beam.sdk.Pipeline. applyInternal(Pipeline. java:542)
at org.apache. beam. sdk.Pipeline.applyTransform(Pipeline. java: 493)
at org.apache. beam. sdk.values.PBegin. apply(PBegin. java: 56)
at org.apache. beam. sdk.Pipeline.apply(Pipeline. java:186)
at com.hsbc.eod.rule.processor.pipeline.RuleProcessorPipeline.run(RuleProcessorPipeline. java: 108)
at com.hsbc.eod.rule.processor.Main.main(Main.java:61)
Caused by: org.apache.beam.sdk.coders.CannotProvideCoderException: Unable to provide a Coder for java.util.HashMap$Node.
Building a Coder using a registered CoderProvider failed.
See suppressed exceptions for detailed failures.
at org.apache.beam.sdk.coders.CoderRegistry.getCoderFromFactories(CoderRegistry. java: 690)
at org.apache.beam.sdk.coders.CoderRegistry.getCoderFromTypeDescriptor(CoderRegistry. java:619)
at org.apache. beam. sdk.coders.CoderRegistry.getCoder(CoderRegistry. java: 251)
at org.apache. beam. sdk.coders.CoderRegistry.getCoder(CoderRegistry. java: 242)
at org.apache.beam.sdk.transforms.Create. inferCoderFromObject (Create. java: 808)
at org.apache. beam. sdk. transforms .Create.inferCoderFromObjects(Create. java: 740}
at org.apache.beam.sdk.transforms.Create. inferCoderFromObject (Create. java: 802)
at org.apache. beam. sdk.transforms.Create.inferCoderFromObjects(Create. java: 746)
at org.apache.beam.sdk.transforms.Create. inferCoderFromObject (Create. java:801)
at org.apache.beam.sdk.transforms.Create. inferCoderFromObject (Create. java: 806)
at org.apache. beam. sdk.transforms.Create.inferCoderFromObjects(Create. java: 746)
at org.apache. beam. sdk.transforms.Create.getDefaultCreateCoder(Create. java: 728)
at org.apache.beam. sdk. transforms .Create.access$300(Create. java:105)
at org.apache.beam.sdk.transforms.Create$Values.expand(Create. java: 354)
Suppressed: org.apache.beam.sdk.coders.CannotProvideCoderException: java.util.HashMap$Node is not one of the common types.
at org.apache. beam. sdk.coders.CoderRegistry$CommonTypes.coderFor(CoderRegistry. java:151)
at org.apache.beam.sdk.coders.CoderRegistry.getCoderFromFactories(CoderRegistry. java:674)
Suppressed: org.apache.beam.sdk.coders.CannotProvideCoderException: Class java.util.HashMap$Node does not have a @DefaultCoder annotation.
at org.apache. beam. sdk.coders.DefaultCoder$DefaultCoderProviderRegistrar$DefaultCoderProvider.coderFor(DefaultCoder. java:89)
at org.apache.beam.sdk.coders.CoderRegistry.getCoderFromFactories(CoderRegistry. java:674)
我试着用 withCoder(KVCoder.of(.....))
但获取嵌套Map时出错。如果这不是实现侧输入的正确方法,那么有人能告诉我正确的方法,以便减少最初3分钟的时间。
只是补充一下,我还看到spark history server上gc时间(红色)增加了。
暂无答案!
目前还没有任何答案,快来回答吧!